当前位置:   article > 正文

python paramiko_python使用paramiko实现ssh

python paramiko_python使用paramiko实现ssh

代码

myssh.py

import utils

import paramiko

class SSHConnection(object):

def __init__(self, host, port=22, proxy_name="proxy"):

self._host = host

self._port = int(port)

self._tunnel_server, self._host, self._port, self._username, self._passwords \

= utils.get_common_ssh_tunnel(proxy_name, self._host, self._port)

self._password = None

self._transport = None

self._sftp = None

self._client = None

self._connect() # 建立连接

def _connect(self):

if self._tunnel_server:

self._tunnel_server.start()

passwords = self._passwords.split("\n")

success = False

transport = None

for password in passwords:

try:

transport = paramiko.Transport((self._host, self._port))

transport.connect(username=self._username, password=password)

self._password = password

success = True

except Exception as e:

if transport:

transport.close()

if "Authentication failed." in e:

continue

if success:

self._transport = transport

else:

raise paramiko.AuthenticationException

# 下载

def download(self, remotepath, localpath):

try:

if self._sftp is None:

self._sftp = paramiko.SFTPClient.from_transport(self._transport)

self._sftp.get(remotepath, localpath)

except Exception as e:

raise e

# 上传

def put(self, localpath, remotepath):

try:

if self._sftp is None:

self._sftp = paramiko.SFTPClient.from_transport(self._transport)

self._sftp.put(localpath, remotepath)

except Exception as e:

raise e

# 执行命令

def exec_command(self, command):

try:

if self._client is None:

self._client = paramiko.SSHClient()

self._client._transport = self._transport

stdin, stdout, stderr = self._client.exec_command(command)

data = stdout.read()

if len(data) > 0:

print data.strip() # 打印正确结果

return data

err = stderr.read()

if len(err) > 0:

print err.strip() # 输出错误结果

return err

except Exception as e:

raise e

def close(self):

if self._transport:

self._transport.close()

if self._client:

self._client.close()

if self._tunnel_server:

self._tunnel_server.close()

utils.py

def get_common_ssh_tunnel(proxy_server, remote_host, remote_port):

session = models.get_db_session()

remote_server = None

try:

remote_server = session.query(models.Server).filter(models.Server.host == remote_host).one()

except Exception as e:

if "No row was found for one()" in e.message:

remote_server = session.query(models.Server).filter(models.Server.name == "common").one()

remote_server.host = remote_host

remote_server.port = remote_port

if not remote_server.need_jump:

return None, remote_server.host, remote_server.port, remote_server

local_port = utils.get_open_port()

proxy = session.query(models.Server).filter(models.Server.name == proxy_server).one()

session.close()

return SSHTunnelForwarder(

(proxy.host, int(proxy.port)),

ssh_username=proxy.username,

ssh_password=proxy.password,

remote_bind_address=(remote_host, int(remote_port)),

local_bind_address=("0.0.0.0", int(local_port))

), "127.0.0.1", local_port, remote_server.username, remote_server.password

参考

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/562015
推荐阅读
相关标签
  

闽ICP备14008679号