对网络设备进行批量操作的几个脚本

This is an article that was created 438 days ago, and the information may have evolved or changed.

对网络设备进行批量操作的几个脚本

netmiko基操

32位windows下偷个懒使用

在32位windows系统中无法使用之前编写的nornir脚本,原因是有几个第三方库从某个版本开始不再开发和维护32位系统的代码,而库的依赖最低版本大于最后发布的32位版本,或者装不上,亦或装上了无法用。So……

github仓库地址:https://github.com/kiraster/netops_alpha_v1.1

部分代码源码来自这位大佬:点滴技术的个人空间_哔哩哔哩_bilibili

有需要的自行研究哈,有建设性的好玩的想法的,欢迎来函探讨

如要使用试用把玩,建议虚拟环境下运行

add_config.py

批量对设备添加相同配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
from settings import *


# 执行配置命令
def run_cmd(task_name, host, cmds, enable=False):

enable = True if host['secret'] else False
flag, conn = connect_handler(host)
try:
if flag:
# 从返回提示符获取设备名称
hostname = conn.find_prompt().replace('<', '').replace('>', '').replace('#', '').strip()
print('正在为设备[{}]添加配置……'.format(hostname))
# 文件保存路径和文件名 '当前目录\\EXPORT\\当天日期\\config_add\\hostname+ip+当前时间.txt'
logtime = datetime.now().strftime("%H%M%S")
output_filename = hostname + '_' + host['ip'] + '_' + logtime + '.txt'

if cmds:
# 判断单元表里命令是否为空值
output = ''
if enable:
# 判断是否需要进入enable特权模式
conn.enable()
output += conn.send_config_set(config_commands=cmds)

else:
output += conn.send_config_set(config_commands=cmds)
else:
# 使用ftp/tftp/sftp/scp
pass
conn.disconnect()

# write_data
write_to_file(task_name, output_filename, output)
sucessful_list.append(host['ip'] + ' ' + hostname)
else:
failed_list.append(conn)
pass

except Exception as e:
print(f"run_cmd Failed: {e}")

@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

hosts = get_device_info(task_name)
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(run_cmd, args=(task_name, host, host['cmd_list']))
pool.close()
pool.join()

return task_name, sucessful_list, failed_list

except Exception:
print('Something Wrong!')


if __name__ == "__main__":
main()

backup_config.py

批量导出设备配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import os
from settings import *


# 执行display/show命令
def run_cmd(task_name, host, cmds, enable=False):

enable = True if host['secret'] else False
flag, conn = connect_handler(host)
try:
if flag:
# 从返回提示符获取设备名称
hostname = conn.find_prompt().replace('<', '').replace('>', '').replace('#', '').strip()
print("正在获取设备[{}]的配置……".format(hostname))
# 文件保存路径和文件名 '当前目录\\EXPORT\\当天日期\\config_backup\\hostname+ip+当前时间.txt'
logtime = datetime.now().strftime("%H%M%S")
output_filename = hostname + '_' + host['ip'] + '_' + logtime + '.txt'

if cmds:
# 判断单元表里命令是否为空值
output = ''
for cmd in cmds:
if enable:
# 判断是否需要进入enable特权模式
conn.enable()
output += '\n' + '=' * 100 + '\n' + cmd.center(100, '=') + '\n'
output += conn.send_command(command_string=cmd)

else:
output += '\n' + '=' * 100 + '\n' + cmd.center(100, '=') + '\n'
output += conn.send_command(command_string=cmd)
else:
# 使用ftp/tftp/sftp/scp
pass
conn.disconnect()

# write_data
write_to_file(task_name, output_filename, output)
sucessful_list.append(host['ip'] + ' ' + hostname)
else:
failed_list.append(conn)
pass

except Exception as e:
print(f"run_cmd Failed: {e}")

@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

hosts = get_device_info(task_name)
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(run_cmd, args=(task_name, host, host['cmd_list']))
pool.close()
pool.join()

return task_name, sucessful_list, failed_list

except Exception:
print('Something Wrong!')


if __name__ == "__main__":
main()

ping.py

批量对设备进行ping测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os
import ping3
import time
from settings import *


# 批量ping测试
def ping_test(host):

ping3.EXCEPTIONS = True
time.sleep(2)
try:
ping3.ping(host['ip'])
res = "{:<18}ping测试成功.".format(host['ip'])
print(res)
sucessful_list.append(host['ip'])
except ping3.errors.HostUnknown:
res = "{:<18}ping测试失败. Host unknown error raised.".format(host['ip'])
print(res)
failed_list.append(host['ip'])
except ping3.errors.PingError:
res = "{:<18}ping测试失败. A ping error raised.".format(host['ip'])
print(res)
failed_list.append(host['ip'])


@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

hosts = get_device_info(task_name)
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(ping_test, args=(host,))
pool.close()
pool.join()

return task_name, sucessful_list, failed_list

except Exception:
print('Something Wrong!')


if __name__ == "__main__":
main()

ssh_connect_test.py

批量对设备进行ssh连接测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os
from settings import *


# SSH测试连接设备
def connect_test(host):

try:
flag, conn = connect_handler(host)
if flag:
# 获取到设备名称则表示ssh连接测试成功
hostname = conn.find_prompt()
result = '{} SSH测试连接成功,获取到设备提示符: {}'.format(host['ip'], hostname)
print(result)
conn.disconnect()
sucessful_list.append(result)
else:
# SSH连接测试失败,同记录
result = '{} SSH测试连接失败,未获取到设备提示符'.format(host['ip'])
failed_list.append(result)

except Exception as e:
print("connect_test Failed: {}".format(e))
failed_list.append(host['ip'])


@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

hosts = get_device_info(task_name)
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(connect_test, args=(host,))
pool.close()
pool.join()

return task_name, sucessful_list, failed_list

except Exception:
print('Something Wrong!')


if __name__ == "__main__":
main()

undifined.py

批量对设备添加不同配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
from settings import *


# 执行配置命令
def run_cmd(task_name, host, cmds, enable=False):

enable = True if host['secret'] else False
flag, conn = connect_handler(host)
try:
if flag:
# 从返回提示符获取设备名称
hostname = conn.find_prompt().replace('<', '').replace('>', '').replace('#', '').strip()
print('正在为设备[{}]添加配置……'.format(hostname))
# 文件保存路径和文件名 '当前目录\\EXPORT\\当天日期\\config_add\\hostname+ip+当前时间.txt'
logtime = datetime.now().strftime("%H%M%S")
output_filename = hostname + '_' + host['ip'] + '_' + logtime + '.txt'

if cmds:
# 判断单元表里命令是否为空值
output = ''
if enable:
# 判断是否需要进入enable特权模式
conn.enable()
output += conn.send_config_set(config_commands=cmds)

else:
output += conn.send_config_set(config_commands=cmds)
else:
pass
conn.disconnect()

# write_data
write_to_file(task_name, output_filename, output)
sucessful_list.append(host['ip'] + ' ' + hostname)
else:
failed_list.append(conn)
pass

except Exception as e:
print(f"run_cmd Failed: {e}")


@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

hosts = get_undifined_device_info()
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(run_cmd, args=(task_name, host, host['cmd_list']))
pool.close()
pool.join()

return task_name, sucessful_list, failed_list

except Exception:
print('Something Wrong!')


if __name__ == "__main__":
main()

settings.py

定义一些参数、装饰器、共用函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
'''
全局设置和共用函数
'''
import os
from datetime import datetime
from functools import wraps
import ipaddress
from openpyxl import load_workbook
from multiprocessing.pool import ThreadPool
from netmiko import ConnectHandler, NetMikoAuthenticationException, NetMikoTimeoutException, ssh_exception


# 项目根目录
BASE_PATH = os.path.dirname((__file__))

# 文件输出目录
EXPORT_PATH = os.path.join(BASE_PATH, 'EXPORT')

# 定义目录名称为当天日期(格式:20220609)
dir_name = datetime.now().strftime("%Y%m%d")

# 目录创建
new_path = os.path.join(EXPORT_PATH, dir_name)
if not os.path.isdir(new_path):
os.makedirs(new_path)

backup_path = os.path.normpath(
os.path.join(EXPORT_PATH, dir_name, 'config_backup'))
config_path = os.path.normpath(
os.path.join(EXPORT_PATH, dir_name, 'config_add'))
# generate_table = os.path.normpath(
# os.path.join(EXPORT_PATH, dir_name, 'generate_table'))

if not os.path.isdir(backup_path):
os.makedirs(backup_path)
if not os.path.isdir(config_path):
os.makedirs(config_path)
# if not os.path.isdir(generate_table):
# os.makedirs(generate_table)

# 表格数据文件路径
device_file = "dev_data.xlsx"

# ThreadPool 设定异步进程数为66
t_pool = ThreadPool(66)

# 定义列表保存执行成功和失败的主机IP
sucessful_list = []
failed_list = []


# 手动输入登陆信息
def login_start():
# login module,校验由netmiko完成
# login_user = input('Login:')
# login_pwd = getpass.getpass('Passwd:')
# # 明文写死在代码,不安全的方式
# login_user = 'admin'
# login_pwd = 'xxx'
# return login_user, login_pwd
pass


# 加载excel文件
def load_excel():
try:
wb = load_workbook(device_file)
return wb
except FileNotFoundError:
print("{} excel文件不存在".format(device_file))
except Exception:
print("载入读取{} excel文件失败".format(device_file))


# 获取设备数据信息
def get_device_info(task_name):
try:
# by openpyxl
# user, pwd = login_start()
wb = load_excel()
ws1 = wb[wb.sheetnames[0]]
# 选定单元格数据区域
for row in ws1.iter_rows(min_row=2, max_col=9):
# 判断IP所在的列不为空值,则进行如下代码
if row[2].value:
if str(row[1].value).strip() == '#':
continue
info_dict = {
'ip':
row[2].value,
'username':
row[5].value,
'password':
row[6].value,
'protocol':
row[3].value,
'port':
row[4].value,
'secret':
row[7].value,
'device_type':
row[8].value,
'cmd_list':
get_cmd_info(task_name, wb[row[8].value.strip().lower()]),
}
yield info_dict
else:
break
except Exception as e:
print("get_device_info failed: {}".format(e))
finally:
wb.close()


# 获取命令信息
def get_cmd_info(task_name, sheet_name):
cmd_list = []
try:
# by openpyxl
for row in sheet_name.iter_rows(min_row=2, max_col=3):
# 若单元格使用“#”进行注释或命令为空值,则跳过该行
if str(row[0].value).strip() != '#' and row[1].value and task_name == 'backup_config.py':
cmd_list.append(row[1].value.strip())
elif str(row[0].value).strip() != '#' and row[2].value and task_name == 'add_config.py':
cmd_list.append(row[2].value.strip())
return cmd_list
except Exception as e:
print("get_cmd_info Error: ", e)


# 获取自定义设备数据信息
def get_undifined_device_info():
try:
# by openpyxl
wb = load_excel()
ws1 = wb[wb.sheetnames[5]]
row_number = 2
# 选定单元格数据区域
for row in ws1.iter_rows(min_row=2, max_col=9):
# 获取执行命令
undifined_cmd_info = []
for cols in ws1.iter_cols(min_col=10,
min_row=row_number,
max_row=row_number,
values_only=True):
for col in cols:
if col is None:
continue
undifined_cmd_info.append(col)
row_number += 1

# 判断IP所在的列不为空值,执行如下代码
if row[2].value:
if str(row[1].value).strip() == '#':
continue
info_dict = {
'ip':
row[2].value,
'username':
row[5].value,
'password':
row[6].value,
'protocol':
row[3].value,
'port':
row[4].value,
'secret':
row[7].value,
'device_type':
row[8].value,
'cmd_list': undifined_cmd_info,
}
yield info_dict
else:
break
except Exception as e:
print("get_device_info failed: {}".format(e))
finally:
wb.close()


# netmiko 连接处理
def connect_handler(host):
try:
connect = ''
# 判断使用SSH协议
# 将“protocol”列单元格的内容大写转小写,去除前后空格对比是否为 ssh
if host['protocol'].lower().strip() == 'ssh':
# 判断“port”列单元格,若单元格填写的内容不是22或空,则定义为22
host['port'] = host['port'] if (host['port'] not in [22, None]) else 22
# 剔除多余connectHandler 不需要的参数,protocol、secret、cmd_list,华为华为设备无特权密码
host.pop('protocol'), host.pop('cmd_list')

if 'huawei' in host['device_type']:
host.pop('secret')
connect = ConnectHandler(**host, conn_timeout=10)
elif 'hp_comware' in host['device_type']:
host.pop('secret')
connect = ConnectHandler(**host)
else:
connect = ConnectHandler(**host, conn_timeout=10)
# 判断使用Telnet协议
elif host['protocol'].lower().strip() == 'telnet':
# 判断“port”列单元格,若单元格填写的内容不是23或空,则定义为23
host['port'] = host['port'] if (host['port'] not in [23, None]) else 23
# 剔除多余connectHandler 不需要的参数,protocol、secret、cmd_list,华三华为设备无特权密码
host.pop('protocol'), host.pop('secret'), host.pop('cmd_list')
# netmiko 支持telnet协议,设备类型格式为:hp_comware_telnet
host['device_type'] = host['device_type'] + '_telnet'
# fast_cli=false,待测试参数
connect = ConnectHandler(**host, fast_cli=False)
# 不在以上两种协议内的连接
else:
res = "暂不支持IP地址为{}_的设备使用{}协议登陆".format(host['ip'], host['protocol'])
raise ValueError(res)
return True, connect

except NetMikoTimeoutException:
res = "{} Can not connect to Device!".format(host['ip'])
print(res)
return False, res
except NetMikoAuthenticationException:
res = "{} username/password wrong!".format(host['ip'])
print(res)
return False, res
except ssh_exception:
res = "{} SSH parameter problem!".format(host['ip'])
print(res)
return False, res
except Exception as e:
print("{} Failed: {}".format(host['ip'], e))
return False, res


# 记录程序执行时间装饰器
def timer(func):

@wraps(func)
def wrapper(*args, **kwargs):
start_time = datetime.now()
func(*args, **kwargs)
end_time = datetime.now()
# print('\n' + '-' * 42)
print('执行完毕,共耗时 {:0.2f} 秒.'.format((end_time - start_time).total_seconds()))
print('-' * 42)
# return res

return wrapper


# 记录运行结果的装饰器
def result_count(func):

@wraps(func)
def wrapper(*args, **kwargs):

task_name, sucessful_list, failed_list = func(*args, **kwargs)
result_count = ('设备总数 {} 台,成功 {} 台,失败 {} 台.'.format(
len(sucessful_list) + len(failed_list), len(sucessful_list), len(failed_list)))
print('\n' + '-' * 42)
print(result_count)
result_path = os.path.normpath(os.path.join(EXPORT_PATH, dir_name, f'result_{dir_name}.log'))
print(f'\n运行结果保存路径: \"{result_path}\"\n')

return task_name, sucessful_list, failed_list

return wrapper


# 写入文件
def write_to_file(task_name, output_filename, output):
# 写入结果到文件
if task_name == 'backup_config.py':
with open(os.path.join(backup_path, output_filename), 'a', encoding="utf-8") as f:
f.write(output)
elif task_name == 'add_config.py':
with open(os.path.join(config_path, output_filename), 'a', encoding="utf-8") as f:
f.write(output)
elif task_name == 'undifined.py':
with open(os.path.join(config_path, output_filename), 'a', encoding="utf-8") as f:
f.write(output)
else:
pass


# 保存运行结果记录的装饰器
def result_write(func):

@wraps(func)
def wrapper(*args, **kwargs):

task_name, sucessful_list, failed_list = func(*args, **kwargs)
result_count = ('设备总数 {} 台,成功 {} 台,失败 {} 台.'.format(
len(sucessful_list) + len(failed_list), len(sucessful_list), len(failed_list)))

# time_str = datetime.now()
time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')

result_path = os.path.normpath(
os.path.join(EXPORT_PATH, dir_name, f'result_{dir_name}.log'))

with open(result_path, 'a', encoding="utf-8") as f:
log_title = task_name.center(100, '=') + '\n' + time_str.center(100, '=') + '\n'
f.write(log_title)
f.write(result_count + '\n')
f.write('\n执行成功设备列表:\n')
for i in sucessful_list:
f.write(i)
f.write('\n')

f.write('\nNG设备列表:\n')
for i in failed_list:
f.write(i)
f.write('\n')
f.write('\n')

return task_name, sucessful_list, failed_list

return wrapper


# 判断是否是正确格式的IP地址,IP地址网络,IP地址范围
def is_valid_ipv4_input(ipv4_str):

try:
# 尝试将输入解析为 IPv4Address
ipaddress.IPv4Address(ipv4_str)
return True
except ValueError:
return False
# try:
# # 将输入拆分为两个 IP 地址
# start, end = ipv4_str.split('-')
# # 尝试将输入解析为 IPv4Address
# ipaddress.IPv4Address(start.strip())
# ipaddress.IPv4Address(end.strip())
# return True
# except ValueError as e:
# # 尝试将输入解析为 IPv4Network
# try:
# network = ipaddress.IPv4Network(ipv4_str)
# if network.hostmask != '0.0.0.0':
# return True
# else:
# return False
# except ValueError as e:
# # print(e)
# return False

dev_data.xlsx

定义设备登陆参数和命令

ScreenCaputure230709135002

requirements.txt

使用到的第三方库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bcrypt==4.0.1
cffi==1.15.1
cryptography==41.0.1
et-xmlfile==1.1.0
future==0.18.3
importlib-metadata==6.8.0
netmiko==3.4.0
ntc-templates==3.4.0
openpyxl==3.1.2
paramiko==3.2.0
ping3==4.0.4
platformdirs==3.8.1
pycparser==2.21
PyNaCl==1.5.0
pyserial==3.5
PyYAML==6.0
scp==0.14.5
six==1.16.0
tenacity==8.2.2
textfsm==1.1.3
tomli==2.0.1
yapf==0.40.1
zipp==3.15.0

信锐NMC对接华为AC实现授权ACL下发 GPT问答230615
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×