对网络设备进行批量操作的几个脚本
netmiko基操
32位windows下偷个懒使用
在32位windows系统中无法使用之前编写的nornir脚本,原因是有几个第三方库从某个版本开始不再开发和维护32位系统的代码,而库的依赖最低版本大于最后发布的32位版本,或者装不上,亦或装上了无法用。So……
github仓库地址:https://github.com/kiraster/netops_alpha_v1.1
部分代码源码来自这位大佬:点滴技术的个人空间_哔哩哔哩_bilibili
有需要的自行研究哈,有建设性的好玩的想法的,欢迎来函探讨
如要使用试用把玩,建议虚拟环境下运行
add_config.py
批量对设备添加相同配置
import os
from settings import *
# 执行配置命令
def run_cmd(task_name, host, cmds, enable=False):
enable = True if host['secret'] else False
flag, conn = connect_handler(host)
try:
if flag:
# 从返回提示符获取设备名称
hostname = conn.find_prompt().replace('<', '').replace('>', '').replace('#', '').strip()
print('正在为设备[{}]添加配置……'.format(hostname))
# 文件保存路径和文件名 '当前目录\\EXPORT\\当天日期\\config_add\\hostname+ip+当前时间.txt'
logtime = datetime.now().strftime("%H%M%S")
output_filename = hostname + '_' + host['ip'] + '_' + logtime + '.txt'
if cmds:
# 判断单元表里命令是否为空值
output = ''
if enable:
# 判断是否需要进入enable特权模式
conn.enable()
output += conn.send_config_set(config_commands=cmds)
else:
output += conn.send_config_set(config_commands=cmds)
else:
# 使用ftp/tftp/sftp/scp
pass
conn.disconnect()
# write_data
write_to_file(task_name, output_filename, output)
sucessful_list.append(host['ip'] + ' ' + hostname)
else:
failed_list.append(conn)
pass
except Exception as e:
print(f"run_cmd Failed: {e}")
@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')
hosts = get_device_info(task_name)
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(run_cmd, args=(task_name, host, host['cmd_list']))
pool.close()
pool.join()
return task_name, sucessful_list, failed_list
except Exception:
print('Something Wrong!')
if __name__ == "__main__":
main()
backup_config.py
批量导出设备配置
import os
from settings import *
# 执行display/show命令
def run_cmd(task_name, host, cmds, enable=False):
enable = True if host['secret'] else False
flag, conn = connect_handler(host)
try:
if flag:
# 从返回提示符获取设备名称
hostname = conn.find_prompt().replace('<', '').replace('>', '').replace('#', '').strip()
print("正在获取设备[{}]的配置……".format(hostname))
# 文件保存路径和文件名 '当前目录\\EXPORT\\当天日期\\config_backup\\hostname+ip+当前时间.txt'
logtime = datetime.now().strftime("%H%M%S")
output_filename = hostname + '_' + host['ip'] + '_' + logtime + '.txt'
if cmds:
# 判断单元表里命令是否为空值
output = ''
for cmd in cmds:
if enable:
# 判断是否需要进入enable特权模式
conn.enable()
output += '\n' + '=' * 100 + '\n' + cmd.center(100, '=') + '\n'
output += conn.send_command(command_string=cmd)
else:
output += '\n' + '=' * 100 + '\n' + cmd.center(100, '=') + '\n'
output += conn.send_command(command_string=cmd)
else:
# 使用ftp/tftp/sftp/scp
pass
conn.disconnect()
# write_data
write_to_file(task_name, output_filename, output)
sucessful_list.append(host['ip'] + ' ' + hostname)
else:
failed_list.append(conn)
pass
except Exception as e:
print(f"run_cmd Failed: {e}")
@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')
hosts = get_device_info(task_name)
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(run_cmd, args=(task_name, host, host['cmd_list']))
pool.close()
pool.join()
return task_name, sucessful_list, failed_list
except Exception:
print('Something Wrong!')
if __name__ == "__main__":
main()
ping.py
批量对设备进行ping测试
import os
import ping3
import time
from settings import *
# 批量ping测试
def ping_test(host):
ping3.EXCEPTIONS = True
time.sleep(2)
try:
ping3.ping(host['ip'])
res = "{:<18}ping测试成功.".format(host['ip'])
print(res)
sucessful_list.append(host['ip'])
except ping3.errors.HostUnknown:
res = "{:<18}ping测试失败. Host unknown error raised.".format(host['ip'])
print(res)
failed_list.append(host['ip'])
except ping3.errors.PingError:
res = "{:<18}ping测试失败. A ping error raised.".format(host['ip'])
print(res)
failed_list.append(host['ip'])
@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')
hosts = get_device_info(task_name)
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(ping_test, args=(host,))
pool.close()
pool.join()
return task_name, sucessful_list, failed_list
except Exception:
print('Something Wrong!')
if __name__ == "__main__":
main()
ssh_connect_test.py
批量对设备进行ssh连接测试
import os
from settings import *
# SSH测试连接设备
def connect_test(host):
try:
flag, conn = connect_handler(host)
if flag:
# 获取到设备名称则表示ssh连接测试成功
hostname = conn.find_prompt()
result = '{} SSH测试连接成功,获取到设备提示符: {}'.format(host['ip'], hostname)
print(result)
conn.disconnect()
sucessful_list.append(result)
else:
# SSH连接测试失败,同记录
result = '{} SSH测试连接失败,未获取到设备提示符'.format(host['ip'])
failed_list.append(result)
except Exception as e:
print("connect_test Failed: {}".format(e))
failed_list.append(host['ip'])
@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')
hosts = get_device_info(task_name)
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(connect_test, args=(host,))
pool.close()
pool.join()
return task_name, sucessful_list, failed_list
except Exception:
print('Something Wrong!')
if __name__ == "__main__":
main()
undifined.py
批量对设备添加不同配置
import os
from settings import *
# 执行配置命令
def run_cmd(task_name, host, cmds, enable=False):
enable = True if host['secret'] else False
flag, conn = connect_handler(host)
try:
if flag:
# 从返回提示符获取设备名称
hostname = conn.find_prompt().replace('<', '').replace('>', '').replace('#', '').strip()
print('正在为设备[{}]添加配置……'.format(hostname))
# 文件保存路径和文件名 '当前目录\\EXPORT\\当天日期\\config_add\\hostname+ip+当前时间.txt'
logtime = datetime.now().strftime("%H%M%S")
output_filename = hostname + '_' + host['ip'] + '_' + logtime + '.txt'
if cmds:
# 判断单元表里命令是否为空值
output = ''
if enable:
# 判断是否需要进入enable特权模式
conn.enable()
output += conn.send_config_set(config_commands=cmds)
else:
output += conn.send_config_set(config_commands=cmds)
else:
pass
conn.disconnect()
# write_data
write_to_file(task_name, output_filename, output)
sucessful_list.append(host['ip'] + ' ' + hostname)
else:
failed_list.append(conn)
pass
except Exception as e:
print(f"run_cmd Failed: {e}")
@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
try:
# 获取当前运行的Python文件的路径
current_file_path = os.path.abspath(__file__)
# 提取文件名
task_name = os.path.basename(current_file_path)
print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')
hosts = get_undifined_device_info()
pool = t_pool
# hosts是一个返回的生成器,需要进行循环遍历
for host in hosts:
# 单线程同步输出方式执行
# run_cmd(host, host['cmd_list'])
# 多线程异步处理
pool.apply_async(run_cmd, args=(task_name, host, host['cmd_list']))
pool.close()
pool.join()
return task_name, sucessful_list, failed_list
except Exception:
print('Something Wrong!')
if __name__ == "__main__":
main()
settings.py
定义一些参数、装饰器、共用函数
'''
全局设置和共用函数
'''
import os
from datetime import datetime
from functools import wraps
import ipaddress
from openpyxl import load_workbook
from multiprocessing.pool import ThreadPool
from netmiko import ConnectHandler, NetMikoAuthenticationException, NetMikoTimeoutException, ssh_exception
# 项目根目录
BASE_PATH = os.path.dirname((__file__))
# 文件输出目录
EXPORT_PATH = os.path.join(BASE_PATH, 'EXPORT')
# 定义目录名称为当天日期(格式:20220609)
dir_name = datetime.now().strftime("%Y%m%d")
# 目录创建
new_path = os.path.join(EXPORT_PATH, dir_name)
if not os.path.isdir(new_path):
os.makedirs(new_path)
backup_path = os.path.normpath(
os.path.join(EXPORT_PATH, dir_name, 'config_backup'))
config_path = os.path.normpath(
os.path.join(EXPORT_PATH, dir_name, 'config_add'))
# generate_table = os.path.normpath(
# os.path.join(EXPORT_PATH, dir_name, 'generate_table'))
if not os.path.isdir(backup_path):
os.makedirs(backup_path)
if not os.path.isdir(config_path):
os.makedirs(config_path)
# if not os.path.isdir(generate_table):
# os.makedirs(generate_table)
# 表格数据文件路径
device_file = "dev_data.xlsx"
# ThreadPool 设定异步进程数为66
t_pool = ThreadPool(66)
# 定义列表保存执行成功和失败的主机IP
sucessful_list = []
failed_list = []
# 手动输入登陆信息
def login_start():
# login module,校验由netmiko完成
# login_user = input('Login:')
# login_pwd = getpass.getpass('Passwd:')
# # 明文写死在代码,不安全的方式
# login_user = 'admin'
# login_pwd = 'xxx'
# return login_user, login_pwd
pass
# 加载excel文件
def load_excel():
try:
wb = load_workbook(device_file)
return wb
except FileNotFoundError:
print("{} excel文件不存在".format(device_file))
except Exception:
print("载入读取{} excel文件失败".format(device_file))
# 获取设备数据信息
def get_device_info(task_name):
try:
# by openpyxl
# user, pwd = login_start()
wb = load_excel()
ws1 = wb[wb.sheetnames[0]]
# 选定单元格数据区域
for row in ws1.iter_rows(min_row=2, max_col=9):
# 判断IP所在的列不为空值,则进行如下代码
if row[2].value:
if str(row[1].value).strip() == '#':
continue
info_dict = {
'ip':
row[2].value,
'username':
row[5].value,
'password':
row[6].value,
'protocol':
row[3].value,
'port':
row[4].value,
'secret':
row[7].value,
'device_type':
row[8].value,
'cmd_list':
get_cmd_info(task_name, wb[row[8].value.strip().lower()]),
}
yield info_dict
else:
break
except Exception as e:
print("get_device_info failed: {}".format(e))
finally:
wb.close()
# 获取命令信息
def get_cmd_info(task_name, sheet_name):
cmd_list = []
try:
# by openpyxl
for row in sheet_name.iter_rows(min_row=2, max_col=3):
# 若单元格使用“#”进行注释或命令为空值,则跳过该行
if str(row[0].value).strip() != '#' and row[1].value and task_name == 'backup_config.py':
cmd_list.append(row[1].value.strip())
elif str(row[0].value).strip() != '#' and row[2].value and task_name == 'add_config.py':
cmd_list.append(row[2].value.strip())
return cmd_list
except Exception as e:
print("get_cmd_info Error: ", e)
# 获取自定义设备数据信息
def get_undifined_device_info():
try:
# by openpyxl
wb = load_excel()
ws1 = wb[wb.sheetnames[5]]
row_number = 2
# 选定单元格数据区域
for row in ws1.iter_rows(min_row=2, max_col=9):
# 获取执行命令
undifined_cmd_info = []
for cols in ws1.iter_cols(min_col=10,
min_row=row_number,
max_row=row_number,
values_only=True):
for col in cols:
if col is None:
continue
undifined_cmd_info.append(col)
row_number += 1
# 判断IP所在的列不为空值,执行如下代码
if row[2].value:
if str(row[1].value).strip() == '#':
continue
info_dict = {
'ip':
row[2].value,
'username':
row[5].value,
'password':
row[6].value,
'protocol':
row[3].value,
'port':
row[4].value,
'secret':
row[7].value,
'device_type':
row[8].value,
'cmd_list': undifined_cmd_info,
}
yield info_dict
else:
break
except Exception as e:
print("get_device_info failed: {}".format(e))
finally:
wb.close()
# netmiko 连接处理
def connect_handler(host):
try:
connect = ''
# 判断使用SSH协议
# 将“protocol”列单元格的内容大写转小写,去除前后空格对比是否为 ssh
if host['protocol'].lower().strip() == 'ssh':
# 判断“port”列单元格,若单元格填写的内容不是22或空,则定义为22
host['port'] = host['port'] if (host['port'] not in [22, None]) else 22
# 剔除多余connectHandler 不需要的参数,protocol、secret、cmd_list,华为华为设备无特权密码
host.pop('protocol'), host.pop('cmd_list')
if 'huawei' in host['device_type']:
host.pop('secret')
connect = ConnectHandler(**host, conn_timeout=10)
elif 'hp_comware' in host['device_type']:
host.pop('secret')
connect = ConnectHandler(**host)
else:
connect = ConnectHandler(**host, conn_timeout=10)
# 判断使用Telnet协议
elif host['protocol'].lower().strip() == 'telnet':
# 判断“port”列单元格,若单元格填写的内容不是23或空,则定义为23
host['port'] = host['port'] if (host['port'] not in [23, None]) else 23
# 剔除多余connectHandler 不需要的参数,protocol、secret、cmd_list,华三华为设备无特权密码
host.pop('protocol'), host.pop('secret'), host.pop('cmd_list')
# netmiko 支持telnet协议,设备类型格式为:hp_comware_telnet
host['device_type'] = host['device_type'] + '_telnet'
# fast_cli=false,待测试参数
connect = ConnectHandler(**host, fast_cli=False)
# 不在以上两种协议内的连接
else:
res = "暂不支持IP地址为{}_的设备使用{}协议登陆".format(host['ip'], host['protocol'])
raise ValueError(res)
return True, connect
except NetMikoTimeoutException:
res = "{} Can not connect to Device!".format(host['ip'])
print(res)
return False, res
except NetMikoAuthenticationException:
res = "{} username/password wrong!".format(host['ip'])
print(res)
return False, res
except ssh_exception:
res = "{} SSH parameter problem!".format(host['ip'])
print(res)
return False, res
except Exception as e:
print("{} Failed: {}".format(host['ip'], e))
return False, res
# 记录程序执行时间装饰器
def timer(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = datetime.now()
func(*args, **kwargs)
end_time = datetime.now()
# print('\n' + '-' * 42)
print('执行完毕,共耗时 {:0.2f} 秒.'.format((end_time - start_time).total_seconds()))
print('-' * 42)
# return res
return wrapper
# 记录运行结果的装饰器
def result_count(func):
@wraps(func)
def wrapper(*args, **kwargs):
task_name, sucessful_list, failed_list = func(*args, **kwargs)
result_count = ('设备总数 {} 台,成功 {} 台,失败 {} 台.'.format(
len(sucessful_list) + len(failed_list), len(sucessful_list), len(failed_list)))
print('\n' + '-' * 42)
print(result_count)
result_path = os.path.normpath(os.path.join(EXPORT_PATH, dir_name, f'result_{dir_name}.log'))
print(f'\n运行结果保存路径: \"{result_path}\"\n')
return task_name, sucessful_list, failed_list
return wrapper
# 写入文件
def write_to_file(task_name, output_filename, output):
# 写入结果到文件
if task_name == 'backup_config.py':
with open(os.path.join(backup_path, output_filename), 'a', encoding="utf-8") as f:
f.write(output)
elif task_name == 'add_config.py':
with open(os.path.join(config_path, output_filename), 'a', encoding="utf-8") as f:
f.write(output)
elif task_name == 'undifined.py':
with open(os.path.join(config_path, output_filename), 'a', encoding="utf-8") as f:
f.write(output)
else:
pass
# 保存运行结果记录的装饰器
def result_write(func):
@wraps(func)
def wrapper(*args, **kwargs):
task_name, sucessful_list, failed_list = func(*args, **kwargs)
result_count = ('设备总数 {} 台,成功 {} 台,失败 {} 台.'.format(
len(sucessful_list) + len(failed_list), len(sucessful_list), len(failed_list)))
# time_str = datetime.now()
time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
result_path = os.path.normpath(
os.path.join(EXPORT_PATH, dir_name, f'result_{dir_name}.log'))
with open(result_path, 'a', encoding="utf-8") as f:
log_title = task_name.center(100, '=') + '\n' + time_str.center(100, '=') + '\n'
f.write(log_title)
f.write(result_count + '\n')
f.write('\n执行成功设备列表:\n')
for i in sucessful_list:
f.write(i)
f.write('\n')
f.write('\nNG设备列表:\n')
for i in failed_list:
f.write(i)
f.write('\n')
f.write('\n')
return task_name, sucessful_list, failed_list
return wrapper
# 判断是否是正确格式的IP地址,IP地址网络,IP地址范围
def is_valid_ipv4_input(ipv4_str):
try:
# 尝试将输入解析为 IPv4Address
ipaddress.IPv4Address(ipv4_str)
return True
except ValueError:
return False
# try:
# # 将输入拆分为两个 IP 地址
# start, end = ipv4_str.split('-')
# # 尝试将输入解析为 IPv4Address
# ipaddress.IPv4Address(start.strip())
# ipaddress.IPv4Address(end.strip())
# return True
# except ValueError as e:
# # 尝试将输入解析为 IPv4Network
# try:
# network = ipaddress.IPv4Network(ipv4_str)
# if network.hostmask != '0.0.0.0':
# return True
# else:
# return False
# except ValueError as e:
# # print(e)
# return False
dev_data.xlsx
定义设备登陆参数和命令
requirements.txt
使用到的第三方库
bcrypt==4.0.1
cffi==1.15.1
cryptography==41.0.1
et-xmlfile==1.1.0
future==0.18.3
importlib-metadata==6.8.0
netmiko==3.4.0
ntc-templates==3.4.0
openpyxl==3.1.2
paramiko==3.2.0
ping3==4.0.4
platformdirs==3.8.1
pycparser==2.21
PyNaCl==1.5.0
pyserial==3.5
PyYAML==6.0
scp==0.14.5
six==1.16.0
tenacity==8.2.2
textfsm==1.1.3
tomli==2.0.1
yapf==0.40.1
zipp==3.15.0