ssh连接交换机设备

2019 年 7 月 23 日14:46:59 评论 408 views
#!/usr/bin/env python
# coding=utf-8
"""
    python 版本3.7
    SSH登陆华为交换机执行批量命令
"""

import asyncio
import time
import asyncssh
import sys
import re
import traceback

from controller.core import getIni

username = getIni("switch", "username")
password = getIni("switch", "password")
port = getIni("switch", "port")
switch_host_process_dic = dict()
session_timeout = 300  # 交换机session有效期时间(秒)


def validate_the_command_return_end(line):
    """
    交换机判断下发命令是否返回结束
    在SSH执行命令时,额外多发送了2个#号,就是此处用来捕获后判断是否命令返回结束
    """
    search_str_content = ">|]"
    search_end_content = "##\r\n"
    search_str_result = re.search(search_str_content, line)
    search_end_result = re.search(search_end_content, line)
    return search_str_result and search_end_result


class MySSHClient(asyncssh.SSHClient):
    """
    回调方法
    """

    def connection_made(self, conn):
        """
        创建连接是回调
        :param conn:
        :return:
        """
        print('SSH connection received from %s.' %
              conn.get_extra_info('peername')[0])

    def connection_lost(self, exc):
        """
        连接失败时回回调
        :param exc:
        :return:
        """
        if exc:
            print('SSH connection error: ' + str(exc), file=sys.stderr)
        else:
            print('SSH connection closed.')


async def run_client(host: str, cmds_dict: dict) -> (dict or bool):
    """
    异步执行SSH,并把执行返回的结果放到队列返回
    :param host: 交换机IP
    :param cmds_dict: 执行的命令列表
    :return:
    """
    curr_time = int(time.time())  # 当前时间
    if host not in switch_host_process_dic:
        switch_host_process_dic[host] = {
            "lock": asyncio.Lock()
        }
    results = {}  # 存储要返回的结果
    try:
        # 复用session,交换机默认session有效期是10分钟,过期失效要重新登陆
        process_create_time = switch_host_process_dic[host].get("create_time", 0)       # 获取进程创建时间
        process_time_out = curr_time - process_create_time > session_timeout            # 判断交换机session对象是否超时
        if switch_host_process_dic[host].get("process", None) and not process_time_out:
            print(f"复用 {host} 连接")
            process = switch_host_process_dic[host]["process"]
            switch_host_process_dic[host]["create_time"] = curr_time
        else:
            lock = switch_host_process_dic[host]["lock"]
            await lock.acquire()
            process_create_time = switch_host_process_dic[host].get("create_time", 0)   # 获取进程创建时间
            process_time_out = curr_time - process_create_time > session_timeout        # 判断交换机session对象是否超时
            if not switch_host_process_dic[host].get("process", None) or process_time_out:
                print(f"与 {host} 创建新连接!")
                conn, client = await asyncio.wait_for(
                    asyncssh.create_connection(MySSHClient, host=host, port=int(port),
                                               username=username, password=password, known_hosts=None,
                                               client_keys=None), timeout=30)
                process = await conn.create_process(stderr=asyncssh.STDOUT)
                print(f"{host} 连接已建立!")

                # 存储对象,在session过期前可以复用,免去ssh验陆验证和减少会话数(交换机默认用户数是5个)
                switch_host_process_dic[host]["process"] = process
                switch_host_process_dic[host]["create_time"] = curr_time
            else:
                print(f"复用 {host} 连接")
                process = switch_host_process_dic[host]["process"]
                switch_host_process_dic[host]["create_time"] = curr_time
            lock.release()

        for host_key, cmds in cmds_dict.items():
            results[host_key] = {}
            for i, cmd in enumerate(cmds):
                if i == 0:
                    # 在执行第一条命令前,先输入去除"More"功能(关闭分屏功能),
                    cmd_content = f"""
                    return 
                    screen-length 0 temporary
                    {cmd}
                    """
                else:
                    cmd_content = cmd
                process.stdin.write(cmd_content + '\n##\n')
                while 1:
                    result = await process.stdout.readline()
                    if validate_the_command_return_end(result) or not result:
                        # 如果检测到数据最后一行,则退出循环
                        break

                    if cmd not in results[host_key]:
                        results[host_key][cmd] = result
                    else:
                        results[host_key][cmd] = results[host_key].get(cmd) + result

                    if result.find('More') > 0:
                        # 如果出现"Mone"字段,则继续发送回车,让后面数据显示出来
                        process.stdin.write('\n##\n')
        return results
    except asyncssh.misc.PermissionDenied:
        print("帐号或密码错误,登陆失败!")
        return False
    except Exception:
        print(traceback.format_exc())
        return False

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: