使用paramiko的multiprocess模块
我正在尝试使用paramiko这个Python模块(版本1.7.7.1)来同时在一组远程服务器上执行命令和/或传输文件。一个任务大致是这样的:
jobs = []
for obj in appObjs:
if obj.stop_app:
p = multiprocessing.Process(target=exec_cmd, args=(obj, obj.stop_cmd))
jobs.append(p)
print "Starting job %s" % (p)
p.start()
这里的“obj”包含了很多东西,其中包括一个paramiko的SSHClient、传输工具和SFTPClient。appObjs这个列表大约有25个这样的对象,也就是说我同时连接了25台不同的服务器。
但是,我在paramiko的transport.py文件中遇到了以下错误:
raise AssertionError("PID check failed. RNG must be re-initialized after fork().
Hint: Try Random.atfork()")
我根据这个帖子中的内容修补了/usr/lib/python2.6/site-packages/paramiko/transport.py,链接是https://github.com/newsapps/beeswithmachineguns/issues/17,但似乎没有什么帮助。我已经确认了上面提到的路径中的transport.py确实是正在使用的那个。看起来paramiko的邮件列表也消失了。
这看起来是paramiko的问题,还是我对多进程模块的理解或使用有误?有没有人愿意提供一个实际的解决办法?非常感谢!
2 个回答
在一个关于 Paramiko 的问题 的评论中提到,RNG 错误可以通过为每个进程打开一个单独的 SSH 连接来避免。这样,Paramiko 就不会再报错了。下面这个示例脚本演示了这个方法(我使用的是池而不是进程):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ssh
from multiprocessing import Pool
import getpass
hostnames = [HOST1, HOST2]
user = USERNAME
pw = getpass.getpass("Enter ssh password:")
def processFunc(hostname):
handle = ssh.SSHClient()
handle.set_missing_host_key_policy(ssh.AutoAddPolicy())
handle.connect(hostname, username=user, password=pw)
print("child")
stdin, stdout, stderr = handle.exec_command("ls -l /var/log; sleep 5")
cmdOutput = ""
while True:
try:
cmdOutput += stdout.next()
except StopIteration:
break
print("Got output from host %s:%s" % (hostname, cmdOutput))
handle.close()
pool = Pool(len(hostnames))
pool.map(processFunc, hostnames, 1)
pool.close()
pool.join()
## If you want to compare speed:
# for hostname in hostnames:
# processFunc(hostname)
更新:正如@ento所提到的,分叉的ssh包已经被重新合并回Paramiko,所以下面的内容现在不再适用,你应该重新使用Paramiko。
这是Paramiko中的一个已知问题,已经在一个分叉的版本中修复了(停留在1.7.7.1版本),现在这个版本被称为pypi上的ssh包(截至目前,版本更新到了1.7.11)。
显然,在将一些重要的修复合并到主版本的Paramiko时遇到了问题,而维护者也没有回应。因此,@bitprophet,Fabric的维护者,决定在新的包名pypi上的ssh包下分叉Paramiko。你提到的具体问题可以在这里查看,这也是他决定分叉的原因之一;如果你真的想了解更多,可以阅读详细信息。