多线程在工作完成后生成新进程

0 投票
2 回答
529 浏览
提问于 2025-04-18 09:47

我想定义一个有 n 个工人的工作池,让每个工人去执行放在 rabbitmq 队列里的任务。当某个任务完成后(无论成功还是失败),我希望这个工人能去执行队列里的另一个任务。

我在文档里看到如何创建一组工人,并让他们都等待其他工人完成任务。但我想要的有点不同:我希望有一个 n 个任务的缓冲区,当一个工人完成任务后,它能从缓冲区里再添加一个任务(这样缓冲区里最多只有 n 个任务)。我在文档里找这个功能时遇到了一些困难。

为了让你更明白,我的非多线程代码是这样的:

while True:
    message = get_frame_from_queue() # get message from rabbit mq
    do_task(message.body) #body defines urls to download file
    acknowledge_complete(message) # tell rabbitmq the message is acknowledged

在这个阶段,我的“多线程”实现会是这样的:

@recieves('ask_for_a_job')
def get_a_task():
    # this function is executed when `ask_for_a_job` signal is fired
    message = get_frame_from_queue()
    do_task(message)

def do_tasks(task_info):
    try:
        # do stuff
    finally:
        # once the "worker" has finished start another.
        fire_fignal('ask_for_a_job')

# start the "workers"
for i in range(5):
    fire_fignal('ask_for_a_job')

我不想重新发明轮子。有没有更简单的方法可以实现这个功能呢?

注意,get_frame_from_queue 这个函数不是线程安全的。

2 个回答

0

一种解决方法是使用 multiprocessing.Pool 这个对象。你可以先用一个外层循环从 RabbitMQ 获取 N 个项目。然后把这些项目放入池中,等所有的任务都完成后,再逐个处理这些项目,确认每条消息已经处理完毕。最后,继续外层循环。

来源

import multiprocessing

def worker(word):
    return bool(word=='whiskey')

messages = ['syrup', 'whiskey', 'bitters']

BATCHSIZE = 2
pool = multiprocessing.Pool(BATCHSIZE)

while messages:
    # take first few messages, one per worker
    batch,messages = messages[:BATCHSIZE],messages[BATCHSIZE:]

    print 'BATCH:',
    for res in pool.imap_unordered(worker, batch):
        print res,
    print

    # TODO: acknowledge msgs in 'batch'

输出

BATCH: False True
BATCH: False
2

你可以让每个子进程或线程直接从队列中获取消息,然后在每个线程中,就像同步处理一样,从队列中处理这些消息。

from threading import Thread

def do_task(msg):
   # Do stuff here

def consume():
    while True:
        message = get_frame_from_queue()
        do_task(message.body)
        acknowledge_complete(message)

if __name __ == "__main__":
    threads = []
    for i in range(5):
        t = Thread(target=consume)
        t.start()
        threads.append(t)

这样一来,你就可以同时处理N条来自队列的消息,不需要线程之间进行任何信号传递。

唯一需要注意的就是你使用的rabbitmq库的线程安全性。根据它的实现方式,你可能需要为每个线程建立一个单独的连接,或者可能只需要一个连接,但每个线程使用一个通道等等。

撰写回答