Python 队列并发进程管理

2 投票
3 回答
769 浏览
提问于 2025-04-16 06:29

使用场景如下:

我有一个脚本,它会运行一系列非Python的程序来处理(脉冲星)数据。目前我使用的是subprocess.Popen(..., shell=True),然后用subprocess的communicate函数来捕获这些非Python程序的标准输出和标准错误信息,捕获到的输出我会用Python的日志模块记录下来。

问题是:现在大部分时间只使用了8个核心中的一个。我想同时启动多个进程,每个进程处理数据集的一部分,并且我想跟踪进度。这是一个用来分析低频射电望远镜(LOFAR)数据的脚本/程序。越容易安装、管理和测试越好。我本来打算自己写代码来管理这些,但我相信应该已经有一些简单的库可以实现这个功能。

3 个回答

0

如果我理解得没错,你现在的做法可以稍微调整一下。我建议你先把一个完整的工作流程做成一个函数,然后再加上并行处理的部分。比如说:

  1. 把现在的功能(调用子进程并获取输出)封装成一个函数。这个函数可以创建一个结果对象并返回;或者,你也可以让这个函数把结果写入文件,随你喜欢。
  2. 创建一个可迭代的对象(比如列表),里面包含第一步每一块数据的输入。
  3. 创建一个多进程池,然后利用它的map()功能来对第二步中的每个项目执行第一步的函数。具体细节可以参考Python的多进程文档。

你也可以使用工作者/队列模型。我觉得关键是把当前的子进程和输出捕获的内容封装成一个函数,这个函数负责处理一块数据(不管那是什么)。然后,使用几种方法中的任何一种来添加并行处理的部分就变得很简单,这里只提到了一小部分。

2

也许Celery可以满足你的需求。

2

subprocess模块可以帮你启动多个进程,并且能够跟踪这些进程的状态。不过,问题在于如何在不影响其他进程的情况下读取每个进程的输出。根据不同的操作系统,有几种方法可以做到这一点:可以使用select模块来查看哪个进程有数据可以读取,或者使用fnctl模块将输出管道设置为非阻塞模式,还可以使用线程来读取每个进程的数据(在Windows上,subprocess.Popen.communicate就是用这种方式,因为它没有其他两种选择)。不过每种方法都有其复杂之处。

一个可以帮你处理这些问题的工具是Twisted,它可以根据你的需求启动任意数量的进程,并且可以在这些进程生成数据时调用你的回调函数(以及处理其他情况)。

撰写回答