多线程在工作进程完成后生成新进程

2024-04-27 04:57:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我想定义一个n个worker池,并将每个执行任务保存在rabbitmq队列中。当这个任务完成(失败或成功)时,我希望工作线程从队列中执行另一个任务。在

我可以在文档中看到如何生成一个工人池,并让他们都等待他们的兄弟姐妹完成。不过,我想用不同的方式:我希望有一个由n个任务组成的缓冲区,当一个工作人员完成时,它会将另一个任务添加到缓冲区中(因此,bugger中的任务不超过n个)。我很难在文档中搜索这个。在

对于上下文,我的非多线程代码是:

while True:
    message = get_frame_from_queue() # get message from rabbit mq
    do_task(message.body) #body defines urls to download file
    acknowledge_complete(message) # tell rabbitmq the message is acknowledged

在这个阶段,我的“多线程”实现如下所示:

^{pr2}$

我不想重新发明轮子。有没有一种更为内在的方式来实现这一点?在

注意get_frame_from_queue不是线程安全的。在


Tags: from文档messageget定义队列queue方式
2条回答

您应该能够让每个子进程/线程直接从队列中消费,然后在每个线程中,简单地像同步一样从队列中进行处理。在

from threading import Thread

def do_task(msg):
   # Do stuff here

def consume():
    while True:
        message = get_frame_from_queue()
        do_task(message.body)
        acknowledge_complete(message)

if __name __ == "__main__":
    threads = []
    for i in range(5):
        t = Thread(target=consume)
        t.start()
        threads.append(t)

这样,您将始终同时处理来自队列的N条消息,而不需要在线程之间发出任何信号。在

这里唯一的“问题”是您使用的rabbitmq库的线程安全性。根据它的实现方式,您可能需要每个线程有一个单独的连接,或者可能需要一个每个线程有一个通道的连接,等等

一种解决方案是利用multiprocessing.Pool对象。使用外部循环从RabbitMQ获取N个项目。将项目放入池中,等待整个批处理完成。然后在批处理中循环,确认每条消息。最后继续外循环。在

来源

import multiprocessing

def worker(word):
    return bool(word=='whiskey')

messages = ['syrup', 'whiskey', 'bitters']

BATCHSIZE = 2
pool = multiprocessing.Pool(BATCHSIZE)

while messages:
    # take first few messages, one per worker
    batch,messages = messages[:BATCHSIZE],messages[BATCHSIZE:]

    print 'BATCH:',
    for res in pool.imap_unordered(worker, batch):
        print res,
    print

    # TODO: acknowledge msgs in 'batch'

输出

^{pr2}$

相关问题 更多 >