如何使用paramiko实现路由器交换机批量配置的自动化学习?

摘要:摘要: 本笔记记录了使用paramiko库批量配置路由器交换机的过程,包括环境搭建、代码实现和注意事项。 声明 本工具仅用于学习和实验,不建议在生产环境中使用。 本文由完全AI基于脚本生成,脚本已在华三模拟器环境中测试通过。 脚本由本人编写
摘要: 本笔记记录了使用paramiko库批量配置路由器交换机的过程,包括环境搭建、代码实现和注意事项。 声明 本工具仅用于学习和实验,不建议在生产环境中使用。 本文由完全AI基于脚本生成,脚本已在华三模拟器环境中测试通过。 脚本由本人编写,并由AI辅助纠错,生成。 🚀 一款简洁实用的 SSH 批量命令执行工具(基于 paramiko) 在最近练习时,结合模拟器环境,写了一个小工具:基于 Python + Paramiko 的 SSH 批量命令执行器,它可以读取 ips.csv 里的设备列表、按设备型号加载对应命令集、依次登陆设备执行命令并生成每台设备的操作日志(包括命令输出)。 它已经在我的华三模拟器里跑通了,下面给大家分享这份脚本的核心设计、使用方法和可扩展点。 📌 核心功能概览 ✅ 支持批量设备并行执行 从 ips.csv 读取设备 IP + 设备型号(model) 针对不同型号加载不同命令文件(commands/<model>.txt) 根据 config.json 中 max_workers 设置(默认 5)并发登录设备执行命令。 ✅ 支持密码/密钥双认证 config.json 中配置 username/password 或 key_path/key_passphrase 脚本会自动决定使用密钥还是密码连接 ✅ 逐设备生成「操作报告文件」 在 report/<timestamp>/ 目录下生成: 全局 ssh_execution_log_<timestamp>.txt 每台设备对应的 IP_sysname_model_operate.txt(包含登录欢迎信息 + 命令输出 + 时间戳) 生成汇总 CSV 文件 report/<timestamp>/ips_operate_summary.csv,包含 IP、系统名、型号、操作状态(成功/失败)、操作日志路径。 ✅ 智能获取设备「系统名」 登录后通过 shell 提示符(例如 <Huawei>、user@host#)自动抓取系统名,用于文件命名和日志记录 🧱 关键文件结构(本脚本相关) mydoc/ └─ para-deep.py # 本脚本:批量 SSH 执行核心逻辑 commands/ ├─ default.txt # 默认命令集(如果 型号 是 default) ├─ huawei_1.txt # 每个型号一套命令 ├─ huawei_2.txt ├─ H3C_1.txt ├─ H3C_2.txt └─ ... ips.csv # 设备列表:IP, 型号 config.json # SSH 认证配置,同时连接设备数量 ⚙️ 配置方法(只需三步即可运行) 1) 准备 config.json 示例(密码认证): { "username": "admin", "password": "MySecretPwd" } 示例(密钥认证): { "username": "admin", "key_path": "~/.ssh/id_rsa", "key_passphrase": "密钥口令(如果有)" } 2) 准备 ips.csv 不同型号的设备,命令集文件名中可以包含型号相关信息(例如 huawei_xxxx.txt),脚本会根据 ips.csv 中指定的型号加载对应的命令文件。 型号标记错误时,脚本会默认加载 default.txt 命令集,但可能因命令集不适配导致失败。 示例格式: 192.168.1.10,huawei_xxxx 192.168.1.11,h3c_S6850 # 192.168.1.12,default # 以#开头可注释掉 第一列:IP 第二列:设备型号(对应 commands/<model>.txt) 3) 准备 commands/<model>.txt 以 commands/huawei.txt 为例: display version display current-configuration display interface brief 脚本会按顺序将这些命令发送给设备,并将输出保存到操作文件中。 🧠 脚本亮点解读(核心模块) 完整脚本 import paramiko # SSH连接库 import time # 延时控制 import json # 读取配置文件 import csv # 读取IP列表 import os # 文件路径操作 import sys # 系统退出功能 import re # 正则表达式处理 from datetime import datetime # 用于日志时间戳 from concurrent.futures import ThreadPoolExecutor, as_completed from paramiko import SSHException, AuthenticationException # 异常类型 BASE_DIR = os.path.dirname(__file__) # 获取脚本所在目录 # 全局变量,将在main中设置 LOG_FILE = None CURRENT_REPORT_DIR = None class DeviceSession: """设备会话管理类,封装每个设备的连接和文件操作""" def __init__(self, ip, model, cfg, report_dir): self.ip = ip self.model = model self.cfg = cfg self.report_dir = report_dir self.sysname = None self.operate_file = None self.client = None self.shell = None self.op_output = b"" def setup_operate_file(self): """设置操作文件路径(在获取sysname后调用)""" if not self.sysname: self.sysname = "unknown" # 生成安全的文件名(移除可能不合法的字符) safe_sysname = re.sub(r'[\\/*?:"<>|]', "_", self.sysname) filename = f"{self.ip}_{safe_sysname}_{self.model}_operate.txt" self.operate_file = os.path.join(self.report_dir, filename) # 写入头部信息 with open(self.operate_file, 'w', encoding='utf-8') as f: f.write(f"设备IP: {self.ip}\n") f.write(f"设备型号: {self.model}\n") f.write(f"系统名称: {self.sysname}\n") f.write(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("=" * 60 + "\n\n") log_message(f"[{self.ip}] 操作文件已创建: {filename}") def write_operate(self, content, also_log=False): """写入操作文件""" if self.operate_file: with open(self.operate_file, 'a', encoding='utf-8') as f: f.write(content) if not content.endswith('\n'): f.write('\n') if also_log: log_message(content) def get_ssh_config(self): """获取SSH连接配置""" cfg = self.cfg if not isinstance(cfg, dict): cfg = {} username = cfg.get("username") if not username: raise ValueError("必需在config.json文件中配置username") password = cfg.get("password") port = cfg.get("port", 22) key_path = cfg.get("key_path") key_pass = cfg.get("key_passphrase") pkey = None # 只有在明确配置了key_path时才尝试使用密钥认证 if key_path and key_path.strip(): # 如果提供了密钥路径,尝试加载私钥文件 key_path = os.path.expanduser(key_path) if not os.path.isabs(key_path): key_path = os.path.join(BASE_DIR, key_path) try: pkey = self.load_pkey(key_path, key_pass) log_message(f"已加载私钥文件: {key_path}", also_print=False) except Exception as e: log_message(f"加载私钥失败 ({key_path}): {e},将使用密码认证", also_print=True) pkey = None else: log_message("未配置密钥,将使用密码认证", also_print=False) return username, password, port, pkey def load_pkey(self, path, passphrase=None): """SSH密钥加载函数""" key_classes = [paramiko.RSAKey, paramiko.ECDSAKey, getattr(paramiko, 'Ed25519Key', None), paramiko.DSSKey] for cls in key_classes: if cls is None: continue try: return cls.from_private_key_file(path, password=passphrase) except Exception: continue raise ValueError(f"Unable to load private key from {path}") def get_system_name(self): """从shell会话中获取系统名称""" try: # 发送换行以获取提示符 #self.shell.send("\n") time.sleep(1.5) # 读取输出 output = b"" while self.shell.recv_ready(): chunk = self.shell.recv(65535).replace(b'\x00', b'') output += chunk time.sleep(0.1) # 保存欢迎信息 self.op_output = output # 解码输出 output_text = output.decode('ASCII', errors='ignore').replace('\r', '') # 将欢迎信息写入操作文件 welcome_info = f"{output_text}\n" self.write_operate(welcome_info) # 尝试多种提示符模式匹配 patterns = [ r'[<\[](.*?)[>\]]', # <Huawei> 或 [H3C] r'(\w+)[@#>\$]', # user@hostname> 或 root@localhost# r'^(\S+)[>\$#]', # 行首的主机名后跟 > $ # ] for pattern in patterns: match = re.search(pattern, output_text, re.MULTILINE) if match: self.sysname = match.group(1).strip() # 设置操作文件(现在有了sysname) self.setup_operate_file() # 重新写入欢迎信息(因为文件刚创建) self.write_operate(welcome_info) return self.sysname # 如果没有匹配到,使用unknown self.sysname = "unknown" self.setup_operate_file() self.write_operate(welcome_info) return None except Exception as e: log_message(f"[{self.ip}] 获取系统名称时出错: {e}") self.sysname = "unknown" self.setup_operate_file() return None def load_commands(self): """加载设备对应的命令列表""" # 在当前目录和父目录查找commands文件夹 cmd_dir = None for d in (BASE_DIR, os.path.dirname(BASE_DIR)): cand = os.path.join(d, "commands") if os.path.isdir(cand): cmd_dir = cand break if not cmd_dir: raise FileNotFoundError("commands/ directory not found") path = os.path.join(cmd_dir, f"{self.model}.txt") if not os.path.isfile(path): raise FileNotFoundError(path) with open(path, "r", encoding="utf-8") as f: lines = [ln.rstrip() for ln in f if ln.strip() and not ln.lstrip().startswith("#")] log_message(f"为型号 '{self.model}' 加载了 {len(lines)} 条命令", also_print=False) return lines def execute(self): """执行设备操作的主方法""" started_at = datetime.now() try: # 获取SSH配置 username, password, port, pkey = self.get_ssh_config() log_message(f"[{self.ip}] 正在连接 (型号:{self.model}, 用户名: {username}, 端口: {port}, 认证方式: {'密钥' if pkey else '密码'})...") timeout = self.cfg.get("connect_timeout", 10) # 加载设备对应的命令列表 try: cmds = self.load_commands() except Exception as e: log_message(f"[{self.ip}] 缺少型号 '{self.model}' 的命令文件: {e}") return { 'ip': self.ip, 'model': self.model, 'sysname': self.sysname, 'operate_file': self.operate_file, 'success': False, 'error': f"缺少命令文件: {e}", 'start_time': started_at.strftime('%Y-%m-%d %H:%M:%S'), 'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } # 创建SSH客户端 self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 根据是否有密钥选择不同的连接方式 connect_params = { 'hostname': self.ip, 'port': port, 'username': username, 'timeout': timeout, 'allow_agent': False, 'look_for_keys': False } if pkey: connect_params['pkey'] = pkey log_message(f"[{self.ip}] 正在使用密钥认证连接...") else: if not password: log_message(f"[{self.ip}] 错误: 未提供密码且无有效密钥") return { 'ip': self.ip, 'model': self.model, 'sysname': self.sysname, 'operate_file': self.operate_file, 'success': False, 'error': '未提供密码且无有效密钥', 'start_time': started_at.strftime('%Y-%m-%d %H:%M:%S'), 'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } connect_params['password'] = password log_message(f"[{self.ip}] 正在使用密码认证连接...") self.client.connect(**connect_params) time.sleep(0.1) # 创建交互式shell会话 self.shell = self.client.invoke_shell() # 获取系统名(这会自动设置operate_file) sysname = self.get_system_name() if sysname: log_message(f"已成功连接至[{self.ip}],系统名称: {sysname}") else: log_message(f"[{self.ip}] 连接成功,但无法获取系统名称") time.sleep(0.3) # 记录开始执行命令 #self.write_operate(f"\n--- 开始执行命令,共 {len(cmds)} 条 ---\n") # 逐条发送命令 for i, cmd in enumerate(cmds, 1): #cmd_info = f"\n>>> 命令 [{i}/{len(cmds)}]: {cmd}\n" #self.write_operate(cmd_info) log_message(f"[{self.ip}] 发送命令 [{i}/{len(cmds)}]: {cmd}", also_print=False) #self.shell.send("\n") self.shell.send(cmd + "\n") time.sleep(0.5) # 等待命令执行 # 收集所有命令的输出 time.sleep(0.5) output = b"" while self.shell.recv_ready(): output += self.shell.recv(65535).replace(b'\x00', b'') time.sleep(0.1) # 解码并记录输出 output_text = output.decode('ASCII', errors='ignore').replace('\r', '') # 将输出写入操作文件 if output_text: self.write_operate(f"{output_text}") log_message(f"[{self.ip}] 命令执行完成,输出长度: {len(output_text)} 字符") else: self.write_operate("\n--- 命令执行无输出 ---") log_message(f"[{self.ip}] 命令执行完成,无输出") # 记录完成标记 self.write_operate(f"\n{'='*60}\n设备{self.sysname} {self.ip} 命令执行完成于: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{'='*60}") return { 'ip': self.ip, 'model': self.model, 'sysname': self.sysname, 'operate_file': self.operate_file, 'success': True, 'error': None, 'start_time': started_at.strftime('%Y-%m-%d %H:%M:%S'), 'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } except AuthenticationException as e: log_message(f"[{self.ip}] 认证失败: {e}") if self.operate_file: self.write_operate(f"\n错误: 认证失败 - {e}") return { 'ip': self.ip, 'model': self.model, 'sysname': self.sysname, 'operate_file': self.operate_file, 'success': False, 'error': f"认证失败: {e}", 'start_time': started_at.strftime('%Y-%m-%d %H:%M:%S'), 'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } except SSHException as e: log_message(f"[{self.ip}] SSH协议错误: {e}") if self.operate_file: self.write_operate(f"\n错误: SSH协议错误 - {e}") return { 'ip': self.ip, 'model': self.model, 'sysname': self.sysname, 'operate_file': self.operate_file, 'success': False, 'error': f"SSH协议错误: {e}", 'start_time': started_at.strftime('%Y-%m-%d %H:%M:%S'), 'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } except Exception as e: log_message(f"[{self.ip}] 连接失败: {e}") if self.operate_file: self.write_operate(f"\n错误: {e}") return { 'ip': self.ip, 'model': self.model, 'sysname': self.sysname, 'operate_file': self.operate_file, 'success': False, 'error': str(e), 'start_time': started_at.strftime('%Y-%m-%d %H:%M:%S'), 'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } finally: self.close() def close(self): """关闭连接""" try: if self.shell: self.shell.close() if self.client: self.client.close() log_message(f"[{self.ip}] 连接已关闭", also_print=False) except Exception: pass def log_message(message, also_print=True): """将消息同时写入日志文件和终端""" global LOG_FILE timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[{timestamp}] {message}" if also_print: print(message) # 写入日志文件 if LOG_FILE: with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(log_entry + '\n') def find_file_upwards(start_dir, filename, max_levels=2): """ 向上级目录查找文件 start_dir: 起始目录 filename: 要查找的文件名 max_levels: 最多向上查找2层目录 """ d = start_dir for _ in range(max_levels + 1): candidate = os.path.join(d, filename) if os.path.isfile(candidate): return candidate d = os.path.dirname(d) if not d: break return None def load_config(): """加载config.json配置文件""" path = find_file_upwards(BASE_DIR, "config.json") if not path: log_message("警告: 未找到config.json文件,将使用默认配置", also_print=True) return {} with open(path, "r", encoding="utf-8") as f: config = json.load(f) log_message(f"成功加载配置文件: {path}", also_print=True) return config def load_ips(): """加载IP列表""" path = find_file_upwards(BASE_DIR, "ips.csv") if not path: raise FileNotFoundError("没有在当前目录或父目录中找到ips.csv文件") ips = [] with open(path, "r", encoding="utf-8") as f: reader = csv.reader(f) for row in reader: if not row or row[0].strip().startswith("#"): continue ip = row[0].strip() model = row[1].strip() if len(row) > 1 else "default" ips.append((ip, model)) log_message(f"成功加载IP列表,共 {len(ips)} 个设备", also_print=True) return ips def main(): global LOG_FILE, CURRENT_REPORT_DIR # 创建报告目录 REPORT_BASE = os.path.join(BASE_DIR, "report") TIMESTAMP = datetime.now().strftime('%Y%m%d_%H%M%S') CURRENT_REPORT_DIR = os.path.join(REPORT_BASE, TIMESTAMP) if not os.path.exists(CURRENT_REPORT_DIR): os.makedirs(CURRENT_REPORT_DIR) # 设置日志文件 LOG_FILE = os.path.join(CURRENT_REPORT_DIR, f"ssh_execution_log_{TIMESTAMP}.txt") # 初始化日志文件 with open(LOG_FILE, 'w', encoding='utf-8') as f: f.write(f"SSH批量执行日志 - 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"报告目录: {CURRENT_REPORT_DIR}\n") f.write("=" * 60 + "\n\n") log_message("=" * 60) log_message("SSH批量命令执行工具启动") log_message(f"日志文件: {LOG_FILE}") log_message(f"报告目录: {CURRENT_REPORT_DIR}") log_message("=" * 60) # 加载配置文件 try: cfg = load_config() # 打印配置信息(隐藏密码) if cfg: safe_cfg = cfg.copy() if 'password' in safe_cfg: safe_cfg['password'] = '***' log_message(f"配置信息: {safe_cfg}") except Exception as e: log_message(f"加载配置文件失败: {e}") cfg = {} # 加载IP列表 try: ips = load_ips() except Exception as e: log_message(f"加载IP列表失败: {e}") sys.exit(1) log_message(f"开始处理 {len(ips)} 个设备...") log_message("-" * 60) # 并行执行配置 max_workers = cfg.get('max_workers', 5) try: max_workers = int(max_workers) except Exception: max_workers = 5 max_workers = max(1, max_workers) log_message(f"并发线程数: {max_workers}", also_print=False) # 处理每个设备 success_count = 0 fail_count = 0 results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_ip = {} for ip, model in ips: session = DeviceSession(ip, model, cfg, CURRENT_REPORT_DIR) future = executor.submit(session.execute) future_to_ip[future] = (ip, model) for future in as_completed(future_to_ip): ip, model = future_to_ip[future] try: res = future.result() if not res: log_message(f"[{ip}] 未获得执行结果(可能发生异常)") fail_count += 1 results.append({ 'ip': ip, 'model': model, 'sysname': None, 'operate_file': None, 'success': False, 'error': '未获取到执行结果', 'start_time': None, 'end_time': None, }) else: results.append(res) if res.get('success'): success_count += 1 else: fail_count += 1 except Exception as e: log_message(f"[{ip}] 处理过程中发生未预期错误: {e}") fail_count += 1 results.append({ 'ip': ip, 'model': model, 'sysname': None, 'operate_file': None, 'success': False, 'error': str(e), 'start_time': None, 'end_time': None, }) log_message("-" * 40) # 生成执行汇总表 summary_file = os.path.join(CURRENT_REPORT_DIR, 'summary.csv') with open(summary_file, 'w', encoding='utf-8', newline='') as f: fieldnames = ['ip', 'model', 'sysname', 'success', 'error', 'operate_file', 'start_time', 'end_time'] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for item in results: writer.writerow({k: item.get(k, '') for k in fieldnames}) # 输出统计信息 log_message("=" * 60) log_message(f"执行完成!成功: {success_count} 个设备, 失败: {fail_count} 个设备") log_message(f"详细日志已保存至: {LOG_FILE}") log_message(f"操作文件保存在: {CURRENT_REPORT_DIR}") log_message(f"执行汇总已保存至: {summary_file}") log_message("=" * 60) if __name__ == "__main__": main() ✅ DeviceSession:设备会话管理 建立 SSH 连接 打开 shell 读取欢迎信息并解析 sysname 发送命令并收集输出 统一写入操作文件 + 日志 ✅ 日志&报表输出逻辑 日志文件: report/<timestamp>/ssh_execution_log_<timestamp>.txt 每台设备操作文件: report/<timestamp>/<ip>_<sysname>_<model>_operate.txt 🧩 可拓展点(你可以怎么改进它) ✅ 增加命令结果过滤/关键字匹配(只保存指定输出段) ✅ 增加重试机制(断线重连/失败重试) ✅ 支持更多协议:Telnet、Netconf、SNMP 🏁 运行方式(直接用 Python 运行) python xxxxxxxx.py 脚本会自动生成 report/ 目录,里面包含本次执行所有设备的详细日志。 💡 小提示 如果遇到 “找不到 commands/ 目录” 这样的错误,请确认脚本目录层级是否正确,commands/ 应与脚本同级或上一级目录存在(代码做了向上查找 2 级)。 如果你希望输出更干净、只保留命令执行结果,可以修改 write_operate() 里写入内容的格式,或在命令前后加分隔符。 🎉 总结 这个基于 paramiko 的 SSH 批量命令执行工具,虽然功能简单,但已经具备了批量设备管理的核心能力。通过合理的文件结构和日志设计,它可以帮助网络管理员快速对大量设备进行配置检查和状态收集。