生产者/消费者多生产者和单个消费者写入Python文件

2024-04-20 09:15:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我的要求类似于Multiple producers, single consumer 但我需要用python编写

我创建了一个生成5个并发进程的应用程序(我正在使用多进程库),这5个进程独立地以dict格式生成输出。在

我想现在输出一个文件。在

我正在寻找一个模式,其中我的5个生产者写入一个共享队列,支持并发写入。在

单个使用者进程也可以访问此队列并使用其中的数据,如果没有数据可写入,则可以等待,并在生产者完成任务时终止。在

谢谢Anuj


Tags: 文件数据应用程序队列进程consumer格式模式
2条回答

由于您已经在使用多进程,所以您只需要Queue类

和一个示例(根据队列文档修改)

from multiprocessing import Process, Queue

def child(q, url):
    result = my_process(url)
    q.put(result)

if __name__ == '__main__':
    q = Queue()
    urls = [...]
    children = []
    for url in urls:
       p = Process(target=child, args=(q,url))
       p.start()
       children.append(p)
    for p in children:
       p.join()
       print q.get() #or write to file (might not be the answer from this child)

编辑: 对于每个子级的多个答案,将最后一个For循环替换为:

^{pr2}$

我已经在Python中实现了这个模式,其中一个主管进程生成一组进程,然后使用所有进程的日志消息,并将这些日志消息写入单个日志文件。

基本上,我使用exece来spawan进程,并指定每个进程的stderr连接到一个PTY。然后我的主管打开了所有的主pty,并使用select在一个循环中读取它们。PTY由tty line规程进行行缓冲,您可以在它们上使用readline进行非阻塞读取。我相信我在PTYs上也使用了fcntl来设置os.O_nblock。在

效果很好。唯一的问题是,当您从select poll返回时,您需要为每个pty读取多个行,否则您可能会丢失输出(假设您有一些获取子进程并重新启动的内容)。通过读取每个PTY上可用的所有行,还可以避免回溯与其他消息交叉。在

如果您确实需要发送对象而不是文本行,那么最好使用真正的发布-订阅消息传递系统,如AMQP或ZeroMQ。AMQP是一个比你需要的更大的锤子,所以只有当你希望构建大量类似的应用程序时才去看看。否则,请尝试更简单的0MQhttp://www.zeromq.org/intro:read-the-manual,它只是一个消息传递库,它使套接字更易于使用。在

相关问题 更多 >