对网络设备进行批量操作的几个脚本

对网络设备进行批量操作的几个脚本

netmiko基操

32位windows下偷个懒使用

在32位windows系统中无法使用之前编写的nornir脚本,原因是有几个第三方库从某个版本开始不再开发和维护32位系统的代码,而库的依赖最低版本大于最后发布的32位版本,或者装不上,亦或装上了无法用。So……

github仓库地址:https://github.com/kiraster/netops_alpha_v1.1

部分代码源码来自这位大佬:点滴技术的个人空间_哔哩哔哩_bilibili

有需要的自行研究哈,有建设性的好玩的想法的,欢迎来函探讨

如要使用试用把玩,建议虚拟环境下运行

add_config.py

批量对设备添加相同配置

import os
from settings import *


# 执行配置命令
def run_cmd(task_name, host, cmds, enable=False):

    enable = True if host['secret'] else False
    flag, conn = connect_handler(host)
    try:
        if flag:
            # 从返回提示符获取设备名称
            hostname = conn.find_prompt().replace('<', '').replace('>', '').replace('#', '').strip()
            print('正在为设备[{}]添加配置……'.format(hostname))
            # 文件保存路径和文件名 '当前目录\\EXPORT\\当天日期\\config_add\\hostname+ip+当前时间.txt'
            logtime = datetime.now().strftime("%H%M%S")
            output_filename = hostname + '_' + host['ip'] + '_' + logtime + '.txt'

            if cmds:
                # 判断单元表里命令是否为空值
                output = ''
                if enable:
                    # 判断是否需要进入enable特权模式
                    conn.enable()
                    output += conn.send_config_set(config_commands=cmds)

                else:
                    output += conn.send_config_set(config_commands=cmds)
            else:
                # 使用ftp/tftp/sftp/scp
                pass
            conn.disconnect()
            
            # write_data
            write_to_file(task_name, output_filename, output)
            sucessful_list.append(host['ip'] + ' ' + hostname)
        else:
            failed_list.append(conn)
            pass

    except Exception as e:
        print(f"run_cmd Failed: {e}")

@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
    try:
        # 获取当前运行的Python文件的路径
        current_file_path = os.path.abspath(__file__)
        # 提取文件名
        task_name = os.path.basename(current_file_path)
        print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

        hosts = get_device_info(task_name)
        pool = t_pool
        # hosts是一个返回的生成器,需要进行循环遍历
        for host in hosts:
            # 单线程同步输出方式执行
            # run_cmd(host, host['cmd_list'])
            # 多线程异步处理
            pool.apply_async(run_cmd, args=(task_name, host, host['cmd_list']))
        pool.close()
        pool.join()

        return task_name, sucessful_list, failed_list

    except Exception:
        print('Something Wrong!')


if __name__ == "__main__":
    main()

backup_config.py

批量导出设备配置

import os
from settings import *


# 执行display/show命令
def run_cmd(task_name, host, cmds, enable=False):

    enable = True if host['secret'] else False
    flag, conn = connect_handler(host)
    try:
        if flag:
            # 从返回提示符获取设备名称
            hostname = conn.find_prompt().replace('<', '').replace('>', '').replace('#', '').strip()
            print("正在获取设备[{}]的配置……".format(hostname))
            # 文件保存路径和文件名 '当前目录\\EXPORT\\当天日期\\config_backup\\hostname+ip+当前时间.txt'
            logtime = datetime.now().strftime("%H%M%S")
            output_filename = hostname + '_' + host['ip'] + '_' + logtime + '.txt'

            if cmds:
                # 判断单元表里命令是否为空值
                output = ''
                for cmd in cmds:
                    if enable:
                        # 判断是否需要进入enable特权模式
                        conn.enable()
                        output +=  '\n' + '=' * 100 + '\n' + cmd.center(100, '=') + '\n'
                        output += conn.send_command(command_string=cmd)

                    else:
                        output +=  '\n' + '=' * 100 + '\n' + cmd.center(100, '=') + '\n'
                        output += conn.send_command(command_string=cmd)
            else:
                # 使用ftp/tftp/sftp/scp
                pass
            conn.disconnect()
            
            # write_data
            write_to_file(task_name, output_filename, output)
            sucessful_list.append(host['ip'] + ' ' + hostname)
        else:
            failed_list.append(conn)
            pass

    except Exception as e:
        print(f"run_cmd Failed: {e}")

@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
    try:
        # 获取当前运行的Python文件的路径
        current_file_path = os.path.abspath(__file__)
        # 提取文件名
        task_name = os.path.basename(current_file_path)
        print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

        hosts = get_device_info(task_name)
        pool = t_pool
        # hosts是一个返回的生成器,需要进行循环遍历
        for host in hosts:
            # 单线程同步输出方式执行
            # run_cmd(host, host['cmd_list'])
            # 多线程异步处理
            pool.apply_async(run_cmd, args=(task_name, host, host['cmd_list']))
        pool.close()
        pool.join()

        return task_name, sucessful_list, failed_list

    except Exception:
        print('Something Wrong!')


if __name__ == "__main__":
    main()

ping.py

批量对设备进行ping测试

import os
import ping3
import time
from settings import *


# 批量ping测试
def ping_test(host):
    
    ping3.EXCEPTIONS = True
    time.sleep(2)
    try:
        ping3.ping(host['ip'])
        res = "{:<18}ping测试成功.".format(host['ip'])
        print(res)
        sucessful_list.append(host['ip'])
    except ping3.errors.HostUnknown:
        res = "{:<18}ping测试失败. Host unknown error raised.".format(host['ip'])
        print(res)
        failed_list.append(host['ip'])
    except ping3.errors.PingError:
        res = "{:<18}ping测试失败. A ping error raised.".format(host['ip'])
        print(res)
        failed_list.append(host['ip'])


@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
    try:
        # 获取当前运行的Python文件的路径
        current_file_path = os.path.abspath(__file__)
        # 提取文件名
        task_name = os.path.basename(current_file_path)
        print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

        hosts = get_device_info(task_name)
        pool = t_pool
        # hosts是一个返回的生成器,需要进行循环遍历
        for host in hosts:
            # 单线程同步输出方式执行
            # run_cmd(host, host['cmd_list'])
            # 多线程异步处理
            pool.apply_async(ping_test, args=(host,))
        pool.close()
        pool.join()

        return task_name, sucessful_list, failed_list

    except Exception:
        print('Something Wrong!')


if __name__ == "__main__":
    main()

ssh_connect_test.py

批量对设备进行ssh连接测试

import os
from settings import *


# SSH测试连接设备
def connect_test(host):
    
    try:
        flag, conn = connect_handler(host)
        if flag:
            # 获取到设备名称则表示ssh连接测试成功
            hostname = conn.find_prompt()
            result = '{} SSH测试连接成功,获取到设备提示符: {}'.format(host['ip'], hostname)
            print(result)
            conn.disconnect()
            sucessful_list.append(result)
        else:
            # SSH连接测试失败,同记录
            result = '{} SSH测试连接失败,未获取到设备提示符'.format(host['ip'])
            failed_list.append(result)

    except Exception as e:
        print("connect_test Failed: {}".format(e))
        failed_list.append(host['ip'])


@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
    try:
        # 获取当前运行的Python文件的路径
        current_file_path = os.path.abspath(__file__)
        # 提取文件名
        task_name = os.path.basename(current_file_path)
        print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

        hosts = get_device_info(task_name)
        pool = t_pool
        # hosts是一个返回的生成器,需要进行循环遍历
        for host in hosts:
            # 单线程同步输出方式执行
            # run_cmd(host, host['cmd_list'])
            # 多线程异步处理
            pool.apply_async(connect_test, args=(host,))
        pool.close()
        pool.join()

        return task_name, sucessful_list, failed_list

    except Exception:
        print('Something Wrong!')


if __name__ == "__main__":
    main()

undifined.py

批量对设备添加不同配置

import os
from settings import *


# 执行配置命令
def run_cmd(task_name, host, cmds, enable=False):

    enable = True if host['secret'] else False
    flag, conn = connect_handler(host)
    try:
        if flag:
            # 从返回提示符获取设备名称
            hostname = conn.find_prompt().replace('<', '').replace('>', '').replace('#', '').strip()
            print('正在为设备[{}]添加配置……'.format(hostname))
            # 文件保存路径和文件名 '当前目录\\EXPORT\\当天日期\\config_add\\hostname+ip+当前时间.txt'
            logtime = datetime.now().strftime("%H%M%S")
            output_filename = hostname + '_' + host['ip'] + '_' + logtime + '.txt'

            if cmds:
                # 判断单元表里命令是否为空值
                output = ''
                if enable:
                    # 判断是否需要进入enable特权模式
                    conn.enable()
                    output += conn.send_config_set(config_commands=cmds)

                else:
                    output += conn.send_config_set(config_commands=cmds)
            else:
                pass
            conn.disconnect()
            
            # write_data
            write_to_file(task_name, output_filename, output)
            sucessful_list.append(host['ip'] + ' ' + hostname)
        else:
            failed_list.append(conn)
            pass

    except Exception as e:
        print(f"run_cmd Failed: {e}")


@timer
@result_write
@result_count
# 代码运行主体框架设计
def main():
    try:
        # 获取当前运行的Python文件的路径
        current_file_path = os.path.abspath(__file__)
        # 提取文件名
        task_name = os.path.basename(current_file_path)
        print(f'\n当前执行的脚本是[{task_name}],程序正在执行中>>>\n')

        hosts = get_undifined_device_info()
        pool = t_pool
        # hosts是一个返回的生成器,需要进行循环遍历
        for host in hosts:
            # 单线程同步输出方式执行
            # run_cmd(host, host['cmd_list'])
            # 多线程异步处理
            pool.apply_async(run_cmd, args=(task_name, host, host['cmd_list']))
        pool.close()
        pool.join()

        return task_name, sucessful_list, failed_list

    except Exception:
        print('Something Wrong!')


if __name__ == "__main__":
    main()

settings.py

定义一些参数、装饰器、共用函数

'''
全局设置和共用函数
'''
import os
from datetime import datetime
from functools import wraps
import ipaddress
from openpyxl import load_workbook
from multiprocessing.pool import ThreadPool
from netmiko import ConnectHandler, NetMikoAuthenticationException, NetMikoTimeoutException, ssh_exception


# 项目根目录
BASE_PATH = os.path.dirname((__file__))

# 文件输出目录
EXPORT_PATH = os.path.join(BASE_PATH, 'EXPORT')

# 定义目录名称为当天日期(格式:20220609)
dir_name = datetime.now().strftime("%Y%m%d")

# 目录创建
new_path = os.path.join(EXPORT_PATH, dir_name)
if not os.path.isdir(new_path):
    os.makedirs(new_path)

backup_path = os.path.normpath(
    os.path.join(EXPORT_PATH, dir_name, 'config_backup'))
config_path = os.path.normpath(
    os.path.join(EXPORT_PATH, dir_name, 'config_add'))
# generate_table = os.path.normpath(
#     os.path.join(EXPORT_PATH, dir_name, 'generate_table'))

if not os.path.isdir(backup_path):
    os.makedirs(backup_path)
if not os.path.isdir(config_path):
    os.makedirs(config_path)
# if not os.path.isdir(generate_table):
#     os.makedirs(generate_table)

# 表格数据文件路径
device_file = "dev_data.xlsx"

# ThreadPool 设定异步进程数为66
t_pool = ThreadPool(66)

# 定义列表保存执行成功和失败的主机IP
sucessful_list = []
failed_list = []


# 手动输入登陆信息
def login_start():
    # login module,校验由netmiko完成
    # login_user = input('Login:')
    # login_pwd = getpass.getpass('Passwd:')
    # # 明文写死在代码,不安全的方式
    # login_user = 'admin'
    # login_pwd = 'xxx'
    # return login_user, login_pwd
    pass


# 加载excel文件
def load_excel():
    try:
        wb = load_workbook(device_file)
        return wb
    except FileNotFoundError:
        print("{} excel文件不存在".format(device_file))
    except Exception:
        print("载入读取{} excel文件失败".format(device_file))


# 获取设备数据信息
def get_device_info(task_name):
    try:
        # by openpyxl
        # user, pwd = login_start()
        wb = load_excel()
        ws1 = wb[wb.sheetnames[0]]
        # 选定单元格数据区域
        for row in ws1.iter_rows(min_row=2, max_col=9):
            # 判断IP所在的列不为空值,则进行如下代码
            if row[2].value:
                if str(row[1].value).strip() == '#':
                    continue
                info_dict = {
                    'ip':
                    row[2].value,
                    'username':
                    row[5].value,
                    'password':
                    row[6].value,
                    'protocol':
                    row[3].value,
                    'port':
                    row[4].value,
                    'secret':
                    row[7].value,
                    'device_type':
                    row[8].value,
                    'cmd_list':
                    get_cmd_info(task_name, wb[row[8].value.strip().lower()]),
                }
                yield info_dict
            else:
                break
    except Exception as e:
        print("get_device_info failed: {}".format(e))
    finally:
        wb.close()


# 获取命令信息
def get_cmd_info(task_name, sheet_name):
    cmd_list = []
    try:
        # by openpyxl
        for row in sheet_name.iter_rows(min_row=2, max_col=3):
            # 若单元格使用“#”进行注释或命令为空值,则跳过该行
            if str(row[0].value).strip() != '#' and row[1].value and task_name == 'backup_config.py':
                cmd_list.append(row[1].value.strip())
            elif str(row[0].value).strip() != '#' and row[2].value and task_name == 'add_config.py':
                cmd_list.append(row[2].value.strip())
        return cmd_list
    except Exception as e:
        print("get_cmd_info Error: ", e)


# 获取自定义设备数据信息
def get_undifined_device_info():
    try:
        # by openpyxl
        wb = load_excel()
        ws1 = wb[wb.sheetnames[5]]
        row_number = 2
        # 选定单元格数据区域
        for row in ws1.iter_rows(min_row=2, max_col=9):
            # 获取执行命令
            undifined_cmd_info = []
            for cols in ws1.iter_cols(min_col=10,
                                        min_row=row_number,
                                        max_row=row_number,
                                        values_only=True):
                for col in cols:
                    if col is None:
                        continue
                    undifined_cmd_info.append(col)
            row_number += 1

            # 判断IP所在的列不为空值,执行如下代码
            if row[2].value:
                if str(row[1].value).strip() == '#':
                    continue
                info_dict = {
                    'ip':
                    row[2].value,
                    'username':
                    row[5].value,
                    'password':
                    row[6].value,
                    'protocol':
                    row[3].value,
                    'port':
                    row[4].value,
                    'secret':
                    row[7].value,
                    'device_type':
                    row[8].value,
                    'cmd_list': undifined_cmd_info,
                }
                yield info_dict
            else:
                break
    except Exception as e:
        print("get_device_info failed: {}".format(e))
    finally:
        wb.close()


# netmiko 连接处理
def connect_handler(host):
    try:
        connect = ''
        # 判断使用SSH协议
        # 将“protocol”列单元格的内容大写转小写,去除前后空格对比是否为 ssh
        if host['protocol'].lower().strip() == 'ssh':
            #  判断“port”列单元格,若单元格填写的内容不是22或空,则定义为22
            host['port'] = host['port'] if (host['port'] not in [22, None]) else 22
            # 剔除多余connectHandler 不需要的参数,protocol、secret、cmd_list,华为华为设备无特权密码
            host.pop('protocol'), host.pop('cmd_list')

            if 'huawei' in host['device_type']:
                host.pop('secret')
                connect = ConnectHandler(**host, conn_timeout=10)
            elif 'hp_comware' in host['device_type']:
                host.pop('secret')
                connect = ConnectHandler(**host)
            else:
                connect = ConnectHandler(**host, conn_timeout=10)
        # 判断使用Telnet协议
        elif host['protocol'].lower().strip() == 'telnet':
            # 判断“port”列单元格,若单元格填写的内容不是23或空,则定义为23
            host['port'] = host['port'] if (host['port'] not in [23, None]) else 23
            # 剔除多余connectHandler 不需要的参数,protocol、secret、cmd_list,华三华为设备无特权密码
            host.pop('protocol'), host.pop('secret'), host.pop('cmd_list')
            # netmiko 支持telnet协议,设备类型格式为:hp_comware_telnet
            host['device_type'] = host['device_type'] + '_telnet'
            # fast_cli=false,待测试参数
            connect = ConnectHandler(**host, fast_cli=False)
        # 不在以上两种协议内的连接
        else:
            res = "暂不支持IP地址为{}_的设备使用{}协议登陆".format(host['ip'], host['protocol'])
            raise ValueError(res)
        return True, connect

    except NetMikoTimeoutException:
        res = "{} Can not connect to Device!".format(host['ip'])
        print(res)
        return False, res
    except NetMikoAuthenticationException:
        res = "{} username/password wrong!".format(host['ip'])
        print(res)
        return False, res
    except ssh_exception:
        res = "{} SSH parameter problem!".format(host['ip'])
        print(res)
        return False, res
    except Exception as e:
        print("{} Failed: {}".format(host['ip'], e))
        return False, res


# 记录程序执行时间装饰器
def timer(func):

    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = datetime.now()
        func(*args, **kwargs)
        end_time = datetime.now()
        # print('\n' + '-' * 42)
        print('执行完毕,共耗时 {:0.2f} 秒.'.format((end_time - start_time).total_seconds()))
        print('-' * 42)
        # return res

    return wrapper


# 记录运行结果的装饰器
def result_count(func):

    @wraps(func)
    def wrapper(*args, **kwargs):
       
        task_name, sucessful_list, failed_list = func(*args, **kwargs)
        result_count = ('设备总数 {} 台,成功 {} 台,失败 {} 台.'.format(
            len(sucessful_list) + len(failed_list), len(sucessful_list), len(failed_list)))
        print('\n' + '-' * 42)
        print(result_count)
        result_path = os.path.normpath(os.path.join(EXPORT_PATH, dir_name, f'result_{dir_name}.log'))
        print(f'\n运行结果保存路径: \"{result_path}\"\n')

        return task_name, sucessful_list, failed_list

    return wrapper


# 写入文件
def write_to_file(task_name, output_filename, output):
    # 写入结果到文件
    if task_name == 'backup_config.py':
        with open(os.path.join(backup_path, output_filename), 'a', encoding="utf-8") as f:
            f.write(output)
    elif task_name == 'add_config.py':
        with open(os.path.join(config_path, output_filename), 'a', encoding="utf-8") as f:
            f.write(output)
    elif task_name == 'undifined.py':
        with open(os.path.join(config_path, output_filename), 'a', encoding="utf-8") as f:
            f.write(output)
    else:
        pass


# 保存运行结果记录的装饰器
def result_write(func):

    @wraps(func)
    def wrapper(*args, **kwargs):

        task_name, sucessful_list, failed_list = func(*args, **kwargs)
        result_count = ('设备总数 {} 台,成功 {} 台,失败 {} 台.'.format(
            len(sucessful_list) + len(failed_list), len(sucessful_list), len(failed_list)))

        # time_str = datetime.now()
        time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')

        result_path = os.path.normpath(
            os.path.join(EXPORT_PATH, dir_name, f'result_{dir_name}.log'))

        with open(result_path, 'a', encoding="utf-8") as f:
            log_title = task_name.center(100, '=') + '\n' + time_str.center(100, '=') + '\n'
            f.write(log_title)
            f.write(result_count + '\n')
            f.write('\n执行成功设备列表:\n')
            for i in sucessful_list:
                f.write(i)
                f.write('\n')

            f.write('\nNG设备列表:\n')
            for i in failed_list:
                f.write(i)
                f.write('\n')
            f.write('\n')

        return task_name, sucessful_list, failed_list

    return wrapper


# 判断是否是正确格式的IP地址,IP地址网络,IP地址范围
def is_valid_ipv4_input(ipv4_str):

    try:
        # 尝试将输入解析为 IPv4Address
        ipaddress.IPv4Address(ipv4_str)
        return True
    except ValueError:
        return False
        # try:
        #     # 将输入拆分为两个 IP 地址
        #     start, end = ipv4_str.split('-')
        #     # 尝试将输入解析为 IPv4Address
        #     ipaddress.IPv4Address(start.strip())
        #     ipaddress.IPv4Address(end.strip())
        #     return True
        # except ValueError as e:
        #     # 尝试将输入解析为 IPv4Network
        #     try:
        #         network = ipaddress.IPv4Network(ipv4_str)
        #         if network.hostmask != '0.0.0.0':
        #             return True
        #         else:
        #             return False
        #     except ValueError as e:
        #         # print(e)
        #         return False

dev_data.xlsx

定义设备登陆参数和命令

ScreenCaputure230709135002

requirements.txt

使用到的第三方库

bcrypt==4.0.1
cffi==1.15.1
cryptography==41.0.1
et-xmlfile==1.1.0
future==0.18.3
importlib-metadata==6.8.0
netmiko==3.4.0
ntc-templates==3.4.0
openpyxl==3.1.2
paramiko==3.2.0
ping3==4.0.4
platformdirs==3.8.1
pycparser==2.21
PyNaCl==1.5.0
pyserial==3.5
PyYAML==6.0
scp==0.14.5
six==1.16.0
tenacity==8.2.2
textfsm==1.1.3
tomli==2.0.1
yapf==0.40.1
zipp==3.15.0