Python:使用线程多次调用subprocess.Popen

-1 投票
2 回答
4181 浏览
提问于 2025-04-15 13:30

我有一个正在运行的服务(Twisted jsonrpc 服务器)。当我调用“run_procs”时,服务会查看一堆对象,检查它们的时间戳属性,以确定它们是否应该运行。如果应该运行,它们就会被添加到一个线程池(列表)中,然后线程池中的每个项目都会调用 start() 方法。

我在其他几个应用中也使用过这种设置,想在我的类中使用线程来运行一个函数。然而,当我在每个线程调用的函数中使用 subprocess.Popen 时,这些调用是一个接一个地执行,而不是像我预期的那样并行运行。

以下是一些示例代码:

class ProcService(jsonrpc.JSONRPC):
        self.thread_pool = []
        self.running_threads = []
        self.lock = threading.Lock()

        def clean_pool(self, thread_pool, join=False):
                for th in [x for x in thread_pool if not x.isAlive()]:
                        if join: th.join()
                        thread_pool.remove(th)
                        del th
                return thread_pool

        def run_threads(self, parallel=10):
                while len(self.running_threads)+len(self.thread_pool) > 0:
                        self.clean_pool(self.running_threads, join=True)
                        n = min(max(parallel - len(self.running_threads), 0), len(self.thread_pool))
                        if n > 0:
                                for th in self.thread_pool[0:n]: th.start()
                                self.running_threads.extend(self.thread_pool[0:n])
                                del self.thread_pool[0:n]
                        time.sleep(.01)
                for th in self.running_threads+self.thread_pool: th.join()

        def jsonrpc_run_procs(self):
                for i, item in enumerate(self.items):
                        if item.should_run():
                                self.thread_pool.append(threading.Thread(target=self.run_proc, args=tuple([item])))
                self.run_threads(5)

        def run_proc(self, proc):
                self.lock.acquire()
                print "\nSubprocess started"
                p = subprocess.Popen('%s/program_to_run.py %s' %(os.getcwd(), proc.data), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,)
                stdout_value = proc.communicate('through stdin to stdout')[0]
                self.lock.release()

任何帮助或建议都非常感谢。

* 编辑 *
好的。现在我想从 stdout 管道中读取输出。这有时能工作,但也会出现 select.error: (4, 'Interrupted system call') 的错误。我猜这是因为有时进程在我尝试运行 communicate 方法之前就已经结束了。run_proc 方法中的代码已更改为:

def run_proc(self, proc):
self.lock.acquire()
p = subprocess.Popen( #等等
self.running_procs.append([p, proc.data.id])
self.lock.release()

在我调用 self.run_threads(5) 之后,我调用 self.check_procs()

check_procs 方法会遍历 running_procs 列表,检查 poll() 是否不为 None。我该如何从管道中获取输出?我尝试了以下两种方法:

calling check_procs once:

def check_procs(self):
    for proc_details in self.running_procs:
        proc = proc_details[0]
        while (proc.poll() == None):
            time.sleep(0.1)
        stdout_value = proc.communicate('through stdin to stdout')[0]
        self.running_procs.remove(proc_details)
        print proc_details[1], stdout_value
        del proc_details

calling check_procs in while loop like:

while len(self.running_procs) > 0:
    self.check_procs()

def check_procs(self):
    for proc_details in self.running_procs:
        if (proc.poll() is not None):
            stdout_value = proc.communicate('through stdin to stdout')[0]
            self.running_procs.remove(proc_details)
            print proc_details[1], stdout_value
            del proc_details

2 个回答

1

你遇到的问题可能是因为这一行代码 stdout_value = proc.communicate('through stdin to stdout')[0]。这个方法会让程序等到进程结束后再继续执行,而如果你在使用锁的情况下,它会一个接一个地运行。

你可以做的是把 p 这个变量放到一个列表里,然后使用子进程的API来等待这些子进程完成。在主线程中定期检查每个子进程的状态。

再仔细看看,你可能在这一行也有问题: for th in self.running_threads+self.thread_pool: th.join()。这个 join() 方法也是用来等待线程结束的。

1

我觉得关键的代码是:

    self.lock.acquire()
    print "\nSubprocess started"
    p = subprocess.Popen( # etc
    stdout_value = proc.communicate('through stdin to stdout')[0]
    self.lock.release()

在这里明确调用获取和释放锁的操作应该能保证代码的顺序执行——如果你在这个代码块里做其他事情,而不是使用子进程,你难道不也能观察到顺序执行吗?

编辑:这里大家都没发言,所以我建议去掉锁的使用,改为把每个 stdout_value 放到一个 Queue.Queue() 实例里——因为队列本身就是线程安全的(它自己处理锁的问题),所以你可以在结果准备好并放入队列后,使用 get(或者 get_nowait 等等)来获取结果。一般来说,Queue 是在 Python 中安排线程间通信(有时也包括同步)的最佳方式,只要能做到这样。

具体来说:在开头加上 import Queue;放弃创建、获取和释放 self.lock(直接删掉那三行);在 __init__ 中加上 self.q = Queue.Queue();在调用 stdout_value = proc.communicate(... 之后,添加一行 self.q.put(stdout_value);然后例如用下面的代码结束 jsonrpc_run_procs 方法:

while not self.q.empty():
  result = self.q.get()
  print 'One result is %r' % result

以确认所有结果都在那儿。(通常队列的 empty 方法不太可靠,但在这个情况下,所有放入队列的线程都已经完成,所以应该没问题)。

撰写回答