#!/usr/bin/env python
# coding=utf-8
"""
python 版本3.7
SSH登陆华为交换机执行批量命令
"""
import asyncio
import time
import asyncssh
import sys
import re
import traceback
from controller.core import getIni
username = getIni("switch", "username")
password = getIni("switch", "password")
port = getIni("switch", "port")
switch_host_process_dic = dict()
session_timeout = 300 # 交换机session有效期时间(秒)
def validate_the_command_return_end(line):
"""
交换机判断下发命令是否返回结束
在SSH执行命令时,额外多发送了2个#号,就是此处用来捕获后判断是否命令返回结束
"""
search_str_content = ">|]"
search_end_content = "##\r\n"
search_str_result = re.search(search_str_content, line)
search_end_result = re.search(search_end_content, line)
return search_str_result and search_end_result
class MySSHClient(asyncssh.SSHClient):
"""
回调方法
"""
def connection_made(self, conn):
"""
创建连接是回调
:param conn:
:return:
"""
print('SSH connection received from %s.' %
conn.get_extra_info('peername')[0])
def connection_lost(self, exc):
"""
连接失败时回回调
:param exc:
:return:
"""
if exc:
print('SSH connection error: ' + str(exc), file=sys.stderr)
else:
print('SSH connection closed.')
async def run_client(host: str, cmds_dict: dict) -> (dict or bool):
"""
异步执行SSH,并把执行返回的结果放到队列返回
:param host: 交换机IP
:param cmds_dict: 执行的命令列表
:return:
"""
curr_time = int(time.time()) # 当前时间
if host not in switch_host_process_dic:
switch_host_process_dic[host] = {
"lock": asyncio.Lock()
}
results = {} # 存储要返回的结果
try:
# 复用session,交换机默认session有效期是10分钟,过期失效要重新登陆
process_create_time = switch_host_process_dic[host].get("create_time", 0) # 获取进程创建时间
process_time_out = curr_time - process_create_time > session_timeout # 判断交换机session对象是否超时
if switch_host_process_dic[host].get("process", None) and not process_time_out:
print(f"复用 {host} 连接")
process = switch_host_process_dic[host]["process"]
switch_host_process_dic[host]["create_time"] = curr_time
else:
lock = switch_host_process_dic[host]["lock"]
await lock.acquire()
process_create_time = switch_host_process_dic[host].get("create_time", 0) # 获取进程创建时间
process_time_out = curr_time - process_create_time > session_timeout # 判断交换机session对象是否超时
if not switch_host_process_dic[host].get("process", None) or process_time_out:
print(f"与 {host} 创建新连接!")
conn, client = await asyncio.wait_for(
asyncssh.create_connection(MySSHClient, host=host, port=int(port),
username=username, password=password, known_hosts=None,
client_keys=None), timeout=30)
process = await conn.create_process(stderr=asyncssh.STDOUT)
print(f"{host} 连接已建立!")
# 存储对象,在session过期前可以复用,免去ssh验陆验证和减少会话数(交换机默认用户数是5个)
switch_host_process_dic[host]["process"] = process
switch_host_process_dic[host]["create_time"] = curr_time
else:
print(f"复用 {host} 连接")
process = switch_host_process_dic[host]["process"]
switch_host_process_dic[host]["create_time"] = curr_time
lock.release()
for host_key, cmds in cmds_dict.items():
results[host_key] = {}
for i, cmd in enumerate(cmds):
if i == 0:
# 在执行第一条命令前,先输入去除"More"功能(关闭分屏功能),
cmd_content = f"""
return
screen-length 0 temporary
{cmd}
"""
else:
cmd_content = cmd
process.stdin.write(cmd_content + '\n##\n')
while 1:
result = await process.stdout.readline()
if validate_the_command_return_end(result) or not result:
# 如果检测到数据最后一行,则退出循环
break
if cmd not in results[host_key]:
results[host_key][cmd] = result
else:
results[host_key][cmd] = results[host_key].get(cmd) + result
if result.find('More') > 0:
# 如果出现"Mone"字段,则继续发送回车,让后面数据显示出来
process.stdin.write('\n##\n')
return results
except asyncssh.misc.PermissionDenied:
print("帐号或密码错误,登陆失败!")
return False
except Exception:
print(traceback.format_exc())
return False
您可以选择一种方式赞助本站
支付宝扫一扫赞助
微信钱包扫描赞助
赏