生产者/消费者 多个生产者和一个消费者写入文件 Python

3 投票
2 回答
4704 浏览
提问于 2025-04-17 01:01

我的需求和多个生产者,单个消费者类似,不过我需要用Python来实现。

我创建了一个应用程序,它会同时启动5个进程(我使用的是multiprocessing库)。这5个进程各自独立地生成字典格式的输出。

之前我把输出打印到控制台,但现在我想把它输出到一个文件里。

我在寻找一种模式,让我的5个生产者可以把数据写入一个支持同时写入的共享队列。

同时,还有一个单独的消费者进程,它也能访问这个队列,从中获取数据。如果没有数据可写,它可以等待,等到生产者完成任务后再结束。

谢谢,Anuj

2 个回答

1

我在Python中实现了一个模式,主要是一个管理进程会启动一堆子进程,然后从这些子进程中获取日志信息,并把这些日志写入一个统一的日志文件。

简单来说,我使用了execve来启动这些进程,并且指定每个进程的错误输出(stderr)连接到一个伪终端(PTY)。接着,我的管理进程打开了所有的主PTY,并使用select在一个循环中读取它们的数据。PTY是通过终端行控制来进行行缓冲的,你可以在它们上面使用readline进行非阻塞读取。我还记得我在PTY上使用了fcntl来设置os.O_NONBLOCK。

这个方法效果很好。唯一的问题是,当你从select的轮询返回时,需要每个PTY读取多于一行的数据,否则可能会丢失输出(假设你有其他东西在处理子进程并重启它们)。通过读取每个PTY上所有可用的行,你也可以避免错误追踪信息和其他消息混在一起。

如果你真的需要发送对象而不是文本行,那你最好使用一个真正的发布-订阅消息系统,比如AMQP或ZeroMQ。AMQP功能强大,但可能对你来说有点过于复杂,所以只有在你打算构建很多类似的应用时再考虑它。否则,可以试试更简单的0MQ,http://www.zeromq.org/intro:read-the-manual,它只是一个消息库,让使用套接字变得简单得多。

1

因为你已经在使用多进程,所以你只需要用到队列这个类。

这里有一个示例(是从队列文档中修改过来的)

from multiprocessing import Process, Queue

def child(q, url):
    result = my_process(url)
    q.put(result)

if __name__ == '__main__':
    q = Queue()
    urls = [...]
    children = []
    for url in urls:
       p = Process(target=child, args=(q,url))
       p.start()
       children.append(p)
    for p in children:
       p.join()
       print q.get() #or write to file (might not be the answer from this child)

补充: 如果每个子进程需要返回多个答案,可以把最后的for循环替换成:

while 0 != multiprocessing.active_children():
    print q.get()

撰写回答