Python通过伪TTY清除SSH
我写了一个Python模块,用来处理我程序中的SSH连接:
#!/usr/bin/env python
from vxpty import VX_PTY
class SSHError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return repr(self.msg)
class SSHShell:
def __init__(self, host, port, user, password):
self.host = host
self.port = port
self.user = user
self.password = password
self.authenticated = False
def authenticate(self):
self.tty = VX_PTY(['/usr/bin/ssh', 'ssh', '-p'+str(self.port), self.user+'@'+self.host])
resp = self.tty.read()
if "authenticity of host" in resp:
self.tty.println('yes')
while 1:
resp = self.tty.read()
if "added" in resp:
break
resp = self.tty.read()
if "assword:" in resp:
self.tty.println(self.password)
tmp_resp = self.tty.read()
tmp_resp += self.tty.read()
if "denied" in tmp_resp or "assword:" in tmp_resp:
raise(SSHError("Authentication failed"))
else:
self.authenticated = True
self.tty.println("PS1=''")
return self.authenticated
def execute(self, os_cmd):
self.tty.println(os_cmd)
resp_buf = self.tty.read().replace(os_cmd+'\r\n', '')
return resp_buf
这个模块使用了我之前写的一个pty模块:
#!/usr/bin/env python
import os,pty
class PTYError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return repr(self.msg)
class VX_PTY:
def __init__(self, execlp_args):
self.execlp_args = execlp_args
self.pty_execlp(execlp_args)
def pty_execlp(self, execlp_args):
(self.pid, self.f) = pty.fork()
if self.pid==0:
os.execlp(*execlp_args)
elif self.pid<0:
raise(PTYError("Failed to fork pty"))
def read(self):
data = None
try:
data = os.read(self.f, 1024)
except Exception:
raise(PTYError("Read failed"))
return data
def write(self, data):
try:
os.write(self.f, data)
except Exception:
raise(PTYError("Write failed"))
def fsync(self):
os.fsync(self.f)
def seek_end(self):
os.lseek(self.f, os.SEEK_END, os.SEEK_CUR)
def println(self, ln):
self.write(ln+'\n')
但是,每次我调用execute()
方法时,结果总是先读取到第一行的输出:
>>> import SSH;shell=SSH.SSHShell('localhost',22,'735tesla','notmypassword');shell.authenticate()
True
>>> shell.execute('whoami')
"\x1b[?1034hLaptop:~ 735Tesla$ PS1=''\r\n"
>>>
然后第二次我调用read()
时,才会得到后面的输出:
>>> shell.tty.read()
'whoami\r\n735Tesla\r\n'
>>>
虽然把whoami\r\n
从输出中去掉不成问题,但有没有办法清空输出,这样我就不需要对第一个命令调用两次read()
了?
1 个回答
1
我觉得你的问题比你想的要复杂。不过,幸运的是,解决起来也比你想的简单。
你似乎希望 os.read
能一次性返回所有来自终端的输出。这其实是无法做到的。因为这取决于很多因素,比如终端的实现、网络带宽和延迟,还有你和远程主机的PTYs(伪终端)的表现。在每次调用 read
时,你可能会收到的数据量可以是所有内容,也可能只有一个字符。
如果你只想获取命令的输出,建议你在命令前后加上独特的标记,不用去调整PS1。我的意思是,你需要让终端在执行命令之前输出一个独特的字符串,然后在命令执行完后再输出另一个字符串。这样,你的 tty.read
方法就可以返回这两个标记字符串之间的所有文本。让终端输出这些独特字符串最简单的方法就是使用echo命令。
对于多行命令,你需要把命令放在一个终端函数里,并在执行这个函数之前和之后输出标记。
一个简单的实现方式如下:
def execute(self, cmd):
if '\n' in cmd:
self.pty.println(
'__cmd_func__(){\n%s\n' % cmd +
'}; echo __"cmd_start"__; __cmd_func__; echo __"cmd_end"__; unset -f __cmd_func__'
)
else:
self.pty.println('echo __"cmd_start"__; %s; echo __"cmd_end"__' % cmd)
resp = ''
while not '__cmd_start__\r\n' in resp:
resp += self.pty.read()
resp = resp[resp.find('__cmd_start__\r\n') + 15:] # 15 == len('__cmd_start__\r\n')
while not '_cmd_end__' in resp:
resp += self.pty.read()
return resp[:resp.find('__cmd_end__')]