Python 进程因打开的 Paramiko SSH 连接而挂起
我正在使用Paramiko来监控远程机器上的日志,主要是在测试运行期间。
这个监控是在一个守护线程中进行的,基本上是这样做的:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
transport = ssh.get_transport()
channel = transport.open_session()
channel.exec_command('sudo tail -f ' + self.logfile)
last_partial = ''
while not self.stopped.isSet():
try:
if None == select or None == channel:
break
rl, wl, xl = select.select([channel], [], [], 1.0)
if None == rl:
break
if len(rl) > 0:
# Must be stdout, how can I check?
line = channel.recv(1024)
else:
time.sleep(1.0)
continue
except:
break
if line:
#handle saving the line... lines are 'merged' so that one log is made from all the sources
ssh.close()
我之前遇到过阻塞读取的问题,所以我开始用这种方式处理,大部分时候效果不错。不过我发现当网络变慢时,问题就会出现。
有时候在运行结束时(在上面设置了self.stopped之后),我会看到这个错误。我尝试在设置了stopped之后让程序休眠,并且等待所有监控线程结束,但有时还是会出现卡住的情况。
Exception in thread Thread-9 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
File "/usr/lib/python2.6/site-packages/paramiko/transport.py", line 1470, in run
<type 'exceptions.AttributeError'>: 'NoneType' object has no attribute 'error'
在Paramiko的transport.py文件中,我觉得错误可能出现在这里。可以看看下面的#<<<<<<<<<<<<<<<<<<<<<<<<<<<。
self._channel_handler_table[ptype](chan, m)
elif chanid in self.channels_seen:
self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid)
else:
self._log(ERROR, 'Channel request for unknown channel %d' % chanid)
self.active = False
self.packetizer.close()
elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table):
self.auth_handler._handler_table[ptype](self.auth_handler, m)
else:
self._log(WARNING, 'Oops, unhandled type %d' % ptype)
msg = Message()
msg.add_byte(cMSG_UNIMPLEMENTED)
msg.add_int(m.seqno)
self._send_message(msg)
except SSHException as e:
self._log(ERROR, 'Exception: ' + str(e))
self._log(ERROR, util.tb_strings()) #<<<<<<<<<<<<<<<<<<<<<<<<<<< line 1470
self.saved_exception = e
except EOFError as e:
self._log(DEBUG, 'EOF in transport thread')
#self._log(DEBUG, util.tb_strings())
self.saved_exception = e
except socket.error as e:
if type(e.args) is tuple:
if e.args:
emsg = '%s (%d)' % (e.args[1], e.args[0])
else: # empty tuple, e.g. socket.timeout
emsg = str(e) or repr(e)
else:
emsg = e.args
self._log(ERROR, 'Socket exception: ' + emsg)
self.saved_exception = e
except Exception as e:
self._log(ERROR, 'Unknown exception: ' + str(e))
self._log(ERROR, util.tb_strings())
当运行卡住时,我可以运行>>>>> sudo lsof -i -n | egrep '\'来查看确实有卡住的ssh连接(一直卡住)。我的主要测试进程的PID是15010。
sshd 6478 root 3u IPv4 46405 0t0 TCP *:ssh (LISTEN)
sshd 6478 root 4u IPv6 46407 0t0 TCP *:ssh (LISTEN)
sshd 14559 root 3r IPv4 3287615 0t0 TCP 172.16.0.171:ssh- >10.42.80.100:59913 (ESTABLISHED)
sshd 14563 cmead 3u IPv4 3287615 0t0 TCP 172.16.0.171:ssh->10.42.80.100:59913 (ESTABLISHED)
python 15010 root 12u IPv4 3291525 0t0 TCP 172.16.0.171:43227->172.16.0.142:ssh (ESTABLISHED)
python 15010 root 15u IPv4 3291542 0t0 TCP 172.16.0.171:41928->172.16.0.227:ssh (ESTABLISHED)
python 15010 root 16u IPv4 3291784 0t0 TCP 172.16.0.171:57682->172.16.0.48:ssh (ESTABLISHED)
python 15010 root 17u IPv4 3291779 0t0 TCP 172.16.0.171:43246->172.16.0.142:ssh (ESTABLISHED)
python 15010 root 20u IPv4 3291789 0t0 TCP 172.16.0.171:41949->172.16.0.227:ssh (ESTABLISHED)
python 15010 root 65u IPv4 3292014 0t0 TCP 172.16.0.171:51886->172.16.0.226:ssh (ESTABLISHED)
sshd 15106 root 3r IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)
sshd 15110 cmead 3u IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)
所以,我其实只想让我的进程不要卡住。哦,还有,我不想更新Paramiko,如果这需要把Python更新到2.6.6以上,因为我在centos上,听说更新超过2.6.6会比较“复杂”。
谢谢你的任何建议。
给shavenwarthog的评论,内容太长不能放在评论里:
嗨,感谢你的回答。我有几个快速的问题。1)如果我需要在不确定的时间停止线程怎么办?换句话说,tail -f blah.log线程可能会运行大约3分钟,而我想在这三分钟内检查累积的数据10次?2)我想这也是类似的,当我在一些实际的远程机器上尝试这个时,它不会退出(因为tail -f是不会退出的)。我之前忘了这一点,但我认为非阻塞读取是为了解决这个问题。你觉得你评论的另一个线程加上这个线程足够让这个工作吗?基本上是用我的非阻塞读取来收集每个运行线程的本地数据。然后我只需要在主线程想要从每个运行线程获取数据时上锁,这样似乎可以把我一个锁分散成大约10个锁,这样会有帮助。这样说有没有道理?
1 个回答
下面的代码可以在多个主机上运行一个命令。当每个命令有数据准备好时,这些数据会被打印到屏幕上。
整体结构是参考了Alex Martelli的代码。这个版本增加了更多的日志记录,包括显示每个连接的主机的可读版本。
原来的代码是为那些运行后就退出的命令写的。我把它改成了可以逐步打印数据,当数据可用时就打印。之前,第一个获取锁的线程会在read()
上被阻塞,导致所有线程都无法继续。新的解决方案避免了这个问题。
编辑,一些说明:
如果想在稍后的时间停止程序,我们会遇到一个比较棘手的情况。线程是不可中断的——我们不能简单地设置一个信号处理器来sys.exit()
程序。更新后的代码设置为在3秒后安全退出,通过使用一个while循环来join()
每个线程。对于真正的代码,如果父进程退出,那么线程也应该正确退出。请仔细注意代码中的两个警告,因为信号/退出/线程的交互比较复杂。
代码会在数据到达时处理数据——现在数据只是被打印到控制台。它没有使用非阻塞读取,因为1)非阻塞代码要复杂得多,2)原来的程序没有在父进程中处理子线程的数据。对于线程来说,所有操作都在子线程中进行更简单,子线程可以写入文件、数据库或服务。对于更复杂的情况,可以使用multiprocessing
,这要简单得多,并且有很好的功能来处理多个任务,并在任务崩溃时重新启动。那个库还允许你在多个CPU之间分配负载,而线程则不允许。
祝你玩得开心!
编辑 #2
注意,可以且可能更好地在不使用threading
或multiprocessing
的情况下运行多个进程。简而言之:使用Popen
和select()
循环来处理输出批次。可以在Pastebin查看示例代码:在不使用subprocess/multiprocessing的情况下运行多个命令
源代码
# adapted from https://stackoverflow.com/questions/3485428/creating-multiple-ssh-connections-at-a-time-using-paramiko
import signal, sys, threading
import paramiko
CMD = 'tail -f /var/log/syslog'
def signal_cleanup(_signum, _frame):
print '\nCLEANUP\n'
sys.exit(0)
def workon(host):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host)
_stdin, stdout, _stderr = ssh.exec_command(CMD)
for line in stdout:
print threading.current_thread().name, line,
def main():
hosts = ['localhost', 'localhost']
# exit after a few seconds (see WARNINGs)
signal.signal(signal.SIGALRM, signal_cleanup)
signal.alarm(3)
threads = [
threading.Thread(
target=workon,
args=(host,),
name='host #{}'.format(num+1)
)
for num,host in enumerate(hosts)
]
print 'starting'
for t in threads:
# WARNING: daemon=True allows program to exit when main proc
# does; otherwise we'll wait until all threads complete.
t.daemon = True
t.start()
print 'joining'
for t in threads:
# WARNING: t.join() is uninterruptible; this while loop allows
# signals
# see: http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html
while t.is_alive():
t.join(timeout=0.1)
print 'done!'
if __name__=='__main__':
main()
输出
starting
joining
host #2 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1
host #2 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:29:36 palabras kernel: [159020.809748] ideapad_laptop: Unknown event: 1