Python:使用线程多次调用subprocess.Popen
我有一个正在运行的服务(Twisted jsonrpc 服务器)。当我调用“run_procs”时,服务会查看一堆对象,检查它们的时间戳属性,以确定它们是否应该运行。如果应该运行,它们就会被添加到一个线程池(列表)中,然后线程池中的每个项目都会调用 start() 方法。
我在其他几个应用中也使用过这种设置,想在我的类中使用线程来运行一个函数。然而,当我在每个线程调用的函数中使用 subprocess.Popen 时,这些调用是一个接一个地执行,而不是像我预期的那样并行运行。
以下是一些示例代码:
class ProcService(jsonrpc.JSONRPC):
self.thread_pool = []
self.running_threads = []
self.lock = threading.Lock()
def clean_pool(self, thread_pool, join=False):
for th in [x for x in thread_pool if not x.isAlive()]:
if join: th.join()
thread_pool.remove(th)
del th
return thread_pool
def run_threads(self, parallel=10):
while len(self.running_threads)+len(self.thread_pool) > 0:
self.clean_pool(self.running_threads, join=True)
n = min(max(parallel - len(self.running_threads), 0), len(self.thread_pool))
if n > 0:
for th in self.thread_pool[0:n]: th.start()
self.running_threads.extend(self.thread_pool[0:n])
del self.thread_pool[0:n]
time.sleep(.01)
for th in self.running_threads+self.thread_pool: th.join()
def jsonrpc_run_procs(self):
for i, item in enumerate(self.items):
if item.should_run():
self.thread_pool.append(threading.Thread(target=self.run_proc, args=tuple([item])))
self.run_threads(5)
def run_proc(self, proc):
self.lock.acquire()
print "\nSubprocess started"
p = subprocess.Popen('%s/program_to_run.py %s' %(os.getcwd(), proc.data), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,)
stdout_value = proc.communicate('through stdin to stdout')[0]
self.lock.release()
任何帮助或建议都非常感谢。
* 编辑 *
好的。现在我想从 stdout 管道中读取输出。这有时能工作,但也会出现 select.error: (4, 'Interrupted system call') 的错误。我猜这是因为有时进程在我尝试运行 communicate 方法之前就已经结束了。run_proc 方法中的代码已更改为:
def run_proc(self, proc):
self.lock.acquire()
p = subprocess.Popen( #等等
self.running_procs.append([p, proc.data.id])
self.lock.release()
在我调用 self.run_threads(5) 之后,我调用 self.check_procs()
check_procs 方法会遍历 running_procs 列表,检查 poll() 是否不为 None。我该如何从管道中获取输出?我尝试了以下两种方法:
calling check_procs once:
def check_procs(self):
for proc_details in self.running_procs:
proc = proc_details[0]
while (proc.poll() == None):
time.sleep(0.1)
stdout_value = proc.communicate('through stdin to stdout')[0]
self.running_procs.remove(proc_details)
print proc_details[1], stdout_value
del proc_details
calling check_procs in while loop like:
while len(self.running_procs) > 0:
self.check_procs()
def check_procs(self):
for proc_details in self.running_procs:
if (proc.poll() is not None):
stdout_value = proc.communicate('through stdin to stdout')[0]
self.running_procs.remove(proc_details)
print proc_details[1], stdout_value
del proc_details
2 个回答
你遇到的问题可能是因为这一行代码 stdout_value = proc.communicate('through stdin to stdout')[0]
。这个方法会让程序等到进程结束后再继续执行,而如果你在使用锁的情况下,它会一个接一个地运行。
你可以做的是把 p
这个变量放到一个列表里,然后使用子进程的API来等待这些子进程完成。在主线程中定期检查每个子进程的状态。
再仔细看看,你可能在这一行也有问题: for th in self.running_threads+self.thread_pool: th.join()
。这个 join()
方法也是用来等待线程结束的。
我觉得关键的代码是:
self.lock.acquire()
print "\nSubprocess started"
p = subprocess.Popen( # etc
stdout_value = proc.communicate('through stdin to stdout')[0]
self.lock.release()
在这里明确调用获取和释放锁的操作应该能保证代码的顺序执行——如果你在这个代码块里做其他事情,而不是使用子进程,你难道不也能观察到顺序执行吗?
编辑:这里大家都没发言,所以我建议去掉锁的使用,改为把每个 stdout_value
放到一个 Queue.Queue()
实例里——因为队列本身就是线程安全的(它自己处理锁的问题),所以你可以在结果准备好并放入队列后,使用 get
(或者 get_nowait
等等)来获取结果。一般来说,Queue
是在 Python 中安排线程间通信(有时也包括同步)的最佳方式,只要能做到这样。
具体来说:在开头加上 import Queue
;放弃创建、获取和释放 self.lock
(直接删掉那三行);在 __init__
中加上 self.q = Queue.Queue()
;在调用 stdout_value = proc.communicate(...
之后,添加一行 self.q.put(stdout_value)
;然后例如用下面的代码结束 jsonrpc_run_procs
方法:
while not self.q.empty():
result = self.q.get()
print 'One result is %r' % result
以确认所有结果都在那儿。(通常队列的 empty
方法不太可靠,但在这个情况下,所有放入队列的线程都已经完成,所以应该没问题)。