H3C 防火墙(v7)提取导出安全策略至表格

当前日期时间: 20221201 版本:H3C Comware Software, Version 7.1.064, Release 9628P2416还没有安全策略导出到表格的功能,而有时确实有实际需求导出安全策略到表格,无论是存档还是提交报告

本文介绍使用python脚本语言编写一个根据display回显内容提取安全策略和对象组到表格的方法

环境介绍:

{% blockquote %}

Python 3.9.7

openpyxl 3.0.10

HCL 3.0.1 (现网环境都为敏感内容,勉为其难用模拟器)

VScode 1.73.1

{% endblockquote %}

提取工作过程

命令行登陆设备导出display回显内容

利用SecureCRTLog Session(记录日志)功能生成一个包含回显内容的log文件,放置在与脚本文件相同目录

本次使用的是205550.log',在下面代码中更换为自己的文件名称

使用的命令

# 查看安全策略
dis security-policy ip
# 查看地址对象组
dis object-group ip address 
# 查看服务对象组
dis object-group service 
<Core_FW>dis security-policy ip
Security-policy ip 

 rule 2 name management-local
  action pass
  source-zone management
  destination-zone local
……
……
……
<Core_FW>dis object-group ip address 
Ip address object group api.test.com: 1 object(in use)
 0 network host name api.test.com
……
……
……
<Core_FW>dis object-group service 
Service object group A服务器端口服务: 3 objects(in use)
 0 service tcp destination eq 1433
 10 service tcp destination range 300 3010
 20 service udp destination eq 2222

Service object group 访问test.comAPI接口: 2 objects(in use)
 0 service tcp destination eq 443
 10 service tcp destination range 9000 9100

<Core_FW>

运行的代码

from openpyxl import Workbook
from openpyxl.styles import PatternFill
import openpyxl
from datetime import datetime


class SecPolicy2XLSX:

    # 初始化参数
    def __init__(self):

        # 文件名时间部分,用于区别新旧
        self.create_time = datetime.now().strftime("%Y%m%d%H%M%S")

        # 读取文件,填写同目录从防火墙命令巴拉下来的文件
        self.export_cfg = '205550.log'
        self.f = open(self.export_cfg, 'r', encoding='utf8')

        # 表格 - 表头数据
        self.sheet1_th = [[
            '名称', 'ID', '源安全域', '目的安全域', '类型', '描述', '源地址', \
            '目的地址', '服务', '用户',
            '动作', '内容安全', '记录日志', '策略匹配统计', '启用'
        ]]
        self.sheet2_th = [['对象组名称和被引用', '对象']]

        # 表格初始数据
        self.wb = Workbook()
        self.ws1 = self.wb.active
        self.ws1.title = '安全策略'
        self.ws2 = self.wb.create_sheet('地址对象组')
        self.ws3 = self.wb.create_sheet('服务对象组')

        # 写入表头
        for i in self.sheet1_th:
            self.ws1.append(i)
        for i in self.sheet2_th:
            self.ws2.append(i)
            self.ws3.append(i)

    # 提取安全策略信息
    def sp_comtent(self):

        # 保存单条策略内容的字典
        comtent_dict = {}

        # 保存以下几项内容的字典,原因在于命令行中显示为多个(每个一行)
        src_zone_list = []
        dst_zone_list = []
        src_ip_list = []
        dst_ip_list = []
        service_list = []

        while True:
            cfg_line = self.f.readline()
            if 'rule' in cfg_line:
                name = cfg_line.strip().split(' ', 3)[3]
                id = cfg_line.split()[1]
                comtent_dict['name'] = name
                comtent_dict['id'] = id

            if 'source-zone' in cfg_line:
                src_zone_list.append(cfg_line.strip().split()[1])

            if 'profile' in cfg_line:
                profile_type = cfg_line.strip().split()[1].split('_')[1]
                comtent_dict['profile_type'] = profile_type

            if 'description' in cfg_line:
                desc = cfg_line.strip()[12:]
                comtent_dict['desc'] = desc

            if 'destination-zone' in cfg_line:
                dst_zone_list.append(cfg_line.strip().split()[1])

            if 'source-ip' in cfg_line:
                src_ip_list.append(cfg_line.strip().split(' ', 1)[1])

            if 'destination-ip' in cfg_line:
                dst_ip_list.append(cfg_line.strip().split(' ', 1)[1])

            if 'action' in cfg_line:
                action = cfg_line.strip().split()[1]
                comtent_dict['action'] = action

            if 'logging ' in cfg_line:
                logg = cfg_line.strip().split()[1]
                comtent_dict['logg'] = logg

            if 'counting ' in cfg_line:
                count = cfg_line.strip().split()[1]
                comtent_dict['count'] = count

            if 'disable' in cfg_line:
                # enable = cfg_line.strip()
                comtent_dict['enable'] = 'disable'

            if 'service' in cfg_line:
                service_list.append(cfg_line.strip().split(' ', 1)[1])

            # display 回显的内容 空行分割一条安全策略
            if cfg_line.strip() == '':
                break

            # 把最终列表拼接到字符串
            src_zone = ','.join(src_zone_list)
            dst_zone = ','.join(dst_zone_list)
            src_ip = ','.join(src_ip_list)
            dst_ip = ','.join(dst_ip_list)
            service = ','.join(service_list)

            # 把最终列表拼接到字符串写入comtent_dict字典
            comtent_dict['src_zone'] = src_zone
            comtent_dict['dst_zone'] = dst_zone
            comtent_dict['src_ip'] = src_ip
            comtent_dict['dst_ip'] = dst_ip
            comtent_dict['service'] = service

        # 返回值是单条安全策略的字典
        return comtent_dict

    # 提取地址对象组信息
    def ip_obj_comtent(self):

        comtent_dict = {}
        networw_list = []

        while True:
            cfg_line = self.f.readline()
            if 'Ip address object group' in cfg_line:
                name = cfg_line.strip().split(' ', 4)[4]
                comtent_dict['name'] = name

            if 'network ' in cfg_line:
                networw_list.append(cfg_line.strip().split(' ', 2)[2])

            if cfg_line.strip() == '':
                break

        networw_list = ','.join(networw_list)
        comtent_dict['networw_list'] = networw_list

        # 返回值是单个地址对象组的字典
        return comtent_dict

    # 提取服务对象组信息
    def service_obj_comtent(self):

        comtent_dict = {}
        service_list = []

        while True:
            cfg_line = self.f.readline()
            if 'Service object group' in cfg_line:
                name = cfg_line.strip().split(' ', 3)[3]
                comtent_dict['name'] = name

            if 'service tcp' in cfg_line or 'service udp' in cfg_line:
                service_list.append(cfg_line.strip().split(' ', 2)[2])
            if cfg_line.strip() == '':
                break

        service_list = ','.join(service_list)
        comtent_dict['service_list'] = service_list

        # 返回值是单个服务对象组的字典
        return comtent_dict

    # 循环 返回全部安全策略的生成器
    def more_sp_comtent(self):

        # 找到文件末 的指针,用于判断是否读取完整个文件
        eof = self.f.seek(0, 2)
        self.f.seek(0, 0)
        while True:
            res = self.sp_comtent()
            yield res

            if self.f.tell() >= eof:
                break

    # 循环 返回全部地址对象组的生成器
    def more_ip_comtent(self):

        eof = self.f.seek(0, 2)
        self.f.seek(0, 0)
        while True:
            res = self.ip_obj_comtent()
            yield res

            if self.f.tell() >= eof:
                break

    # 循环 返回全部服务对象组的生成器
    def more_service_comtent(self):

        eof = self.f.seek(0, 2)
        self.f.seek(0, 0)
        while True:
            res = self.service_obj_comtent()
            yield res

            if self.f.tell() >= eof:
                break

    # 安全策略,移除和填充内容字典元素
    def sp_fill_task(self):

        # 生成器 转 列表套字典
        res = self.more_sp_comtent()
        comtent_list = list(res)

        # 移除不相干元素,列表套字典中字典不包含 'name' 键的字典
        index_list = []
        for i in comtent_list:

            key_list = list(i.keys())
            if 'name' not in key_list:
                # 字典中不包含 'name' 键的字典在comtent_list列表的索引值
                index_list.append(comtent_list.index(i))

        # 索引列表去重-->>降序排列-->>删元素
        index_list = list(set(index_list))
        index_list.sort(reverse=True)
        for i in index_list:
            comtent_list.pop(i)

        # comtent_list列表去重
        ret_comtent_list = []
        for i in comtent_list:
            if i not in ret_comtent_list:
                ret_comtent_list.append(i)
        # 去头
        ret_comtent_list = ret_comtent_list[1:]

        # 添加一些默认值,一些安全策略值在display回显的内容不显示
        for i in ret_comtent_list:
            i['user'] = 'Any'
            # 一般应用安全都是使用默认配置文件中default策略,如有自定义的配置文件就增加一个提取函数
            i['ap_profile'] = 'default'
            i.setdefault('desc', '-')
            # 安全策略没有启用显示disable,启用了display没有显示
            i.setdefault('enable', 'enable')
            # IPv4 网络环境默认
            i.setdefault('profile_type', 'IPv4')
            # 命中计数和流量统计,没有启用不在display显示
            i.setdefault('count', 'disable')
            # 记录日志
            i.setdefault('logg', 'enable')
            # print(i)

        # 键的值为空则替换为 Any
        ret_comtent_list = [{k: v if v and k != 'desc' else 'Any'
                             for k, v in i.items()} for i in ret_comtent_list]
        # 最后一条拒绝所有策略的描述
        ret_comtent_list[-1]['desc'] = ''

        self.ret_comtent_list = ret_comtent_list

        # 返回一个完整的列表套字典
        return self.ret_comtent_list

    # 地址对象组,移除和填充内容字典元素
    def ip_fill_task(self):

        # 生成器 转 列表套字典
        res = self.more_ip_comtent()
        comtent_list = list(res)

        # 列表去重
        ret_comtent_list = []
        for i in comtent_list:
            if i not in ret_comtent_list:
                ret_comtent_list.append(i)
        # 去头
        ret_comtent_list = ret_comtent_list[1:]

        self.ret_comtent_list = ret_comtent_list

        return self.ret_comtent_list

    def service_fill_task(self):

        # 生成器 转 列表套字典
        res = self.more_service_comtent()
        comtent_list = list(res)

        # 列表去重
        ret_comtent_list = []
        for i in comtent_list:
            if i not in ret_comtent_list:
                ret_comtent_list.append(i)
        # 去头
        ret_comtent_list = ret_comtent_list[1:]

        self.ret_comtent_list = ret_comtent_list

        # 在此处关闭文件
        self.f.close()

        return self.ret_comtent_list

    # 写入数据到表格 - 安全策略工作表
    def write2ws1(self):

        # 从第二行开始写入
        row = 2
        big_list = self.sp_fill_task()

        for i in big_list:
            self.ws1.cell(row=row, column=1, value=i['name'])
            self.ws1.cell(row=row, column=2, value=i['id'])
            self.ws1.cell(row=row, column=3, value=i['src_zone'])
            self.ws1.cell(row=row, column=4, value=i['dst_zone'])
            self.ws1.cell(row=row, column=5, value=i['profile_type'])
            self.ws1.cell(row=row, column=6, value=i['desc'])
            self.ws1.cell(row=row, column=7, value=i['src_ip'])
            self.ws1.cell(row=row, column=8, value=i['dst_ip'])
            self.ws1.cell(row=row, column=9, value=i['service'])
            self.ws1.cell(row=row, column=10, value=i['user'])
            self.ws1.cell(row=row, column=11, value=i['action'])
            self.ws1.cell(row=row, column=12, value=i['ap_profile'])
            self.ws1.cell(row=row, column=13, value=i['logg'])
            self.ws1.cell(row=row, column=14, value=i['count'])
            self.ws1.cell(row=row, column=15, value=i['enable'])
            row += 1

    # 写入数据到表格 - 地址对象组工作表
    def write2ws2(self):

        row = 2
        big_list = self.ip_fill_task()

        for i in big_list:
            self.ws2.cell(row=row, column=1, value=i['name'])
            self.ws2.cell(row=row, column=2, value=i['networw_list'])
            row += 1

    # 写入数据到表格 - 地址对象组工作表
    def write2ws3(self):

        row = 2
        big_list = self.service_fill_task()

        for i in big_list:
            self.ws3.cell(row=row, column=1, value=i['name'])
            self.ws3.cell(row=row, column=2, value=i['service_list'])
            row += 1

    # 调整表格显示 保存xlsx表格
    def save_to_file(self):

        # 修改ws1列宽
        self.ws1.column_dimensions['A'].width = 50
        self.ws1.column_dimensions['B'].width = 4
        self.ws1.column_dimensions['C'].width = 46
        self.ws1.column_dimensions['D'].width = 38
        self.ws1.column_dimensions['F'].width = 50
        self.ws1.column_dimensions['G'].width = 50
        self.ws1.column_dimensions['H'].width = 50
        self.ws1.column_dimensions['I'].width = 50
        self.ws1.column_dimensions['N'].width = 12

        # 修改ws2列宽
        self.ws2.column_dimensions['A'].width = 50
        self.ws2.column_dimensions['B'].width = 255

        # 修改ws3列宽
        self.ws3.column_dimensions['A'].width = 50
        self.ws3.column_dimensions['B'].width = 255

        # 修改列 垂直居中 自动换行,表头背景色 ,冻结首行
        self.ws1.freeze_panes = 'A2'
        self.ws2.freeze_panes = 'A2'
        self.ws3.freeze_panes = 'A2'

        th_color_fill = PatternFill('solid', fgColor='D3D3D3')
        for col in range(1, 16):
            self.ws1.cell(row=1, column=col).fill = th_color_fill
        for col in range(1, 3):
            self.ws2.cell(row=1, column=col).fill = th_color_fill
        for col in range(1, 3):
            self.ws3.cell(row=1, column=col).fill = th_color_fill

        for r in self.ws1:
            for c in r:
                c.alignment = openpyxl.styles.Alignment(vertical='center',
                                                        wrapText=True)
        for r in self.ws2:
            for c in r:
                c.alignment = openpyxl.styles.Alignment(vertical='center',
                                                        wrapText=True)
        for r in self.ws3:
            for c in r:
                c.alignment = openpyxl.styles.Alignment(vertical='center',
                                                        wrapText=True)

        # 保存并生成文件
        self.wb.save(f'XXX防火墙安全策略统计{self.create_time}.xlsx')


if __name__ == '__main__':

    # 类实例化
    x = SecPolicy2XLSX()
    # res = x.sp_comtent()
    # res = x.more_comtent()
    # res = x.more_ip_comtent()
    # for i in res:
    #     print(i)
    # x.ip_fill_task()
    x.write2ws1()
    x.write2ws2()
    x.write2ws3()
    x.save_to_file()
    print('搞快点-->>搞快点-->>')

导出的表格

ScreenCaputure221201214539

ScreenCaputure221201214703

ScreenCaputure221201214728

20221201220702