生产者/消费者 多个生产者和一个消费者写入文件 Python
我的需求和多个生产者,单个消费者类似,不过我需要用Python来实现。
我创建了一个应用程序,它会同时启动5个进程(我使用的是multiprocessing库)。这5个进程各自独立地生成字典格式的输出。
之前我把输出打印到控制台,但现在我想把它输出到一个文件里。
我在寻找一种模式,让我的5个生产者可以把数据写入一个支持同时写入的共享队列。
同时,还有一个单独的消费者进程,它也能访问这个队列,从中获取数据。如果没有数据可写,它可以等待,等到生产者完成任务后再结束。
谢谢,Anuj
2 个回答
我在Python中实现了一个模式,主要是一个管理进程会启动一堆子进程,然后从这些子进程中获取日志信息,并把这些日志写入一个统一的日志文件。
简单来说,我使用了execve来启动这些进程,并且指定每个进程的错误输出(stderr)连接到一个伪终端(PTY)。接着,我的管理进程打开了所有的主PTY,并使用select
在一个循环中读取它们的数据。PTY是通过终端行控制来进行行缓冲的,你可以在它们上面使用readline进行非阻塞读取。我还记得我在PTY上使用了fcntl来设置os.O_NONBLOCK。
这个方法效果很好。唯一的问题是,当你从select的轮询返回时,需要每个PTY读取多于一行的数据,否则可能会丢失输出(假设你有其他东西在处理子进程并重启它们)。通过读取每个PTY上所有可用的行,你也可以避免错误追踪信息和其他消息混在一起。
如果你真的需要发送对象而不是文本行,那你最好使用一个真正的发布-订阅消息系统,比如AMQP或ZeroMQ。AMQP功能强大,但可能对你来说有点过于复杂,所以只有在你打算构建很多类似的应用时再考虑它。否则,可以试试更简单的0MQ,http://www.zeromq.org/intro:read-the-manual,它只是一个消息库,让使用套接字变得简单得多。
因为你已经在使用多进程,所以你只需要用到队列这个类。
这里有一个示例(是从队列文档中修改过来的)
from multiprocessing import Process, Queue
def child(q, url):
result = my_process(url)
q.put(result)
if __name__ == '__main__':
q = Queue()
urls = [...]
children = []
for url in urls:
p = Process(target=child, args=(q,url))
p.start()
children.append(p)
for p in children:
p.join()
print q.get() #or write to file (might not be the answer from this child)
补充: 如果每个子进程需要返回多个答案,可以把最后的for循环替换成:
while 0 != multiprocessing.active_children():
print q.get()