多线程在工作完成后生成新进程
我想定义一个有 n 个工人的工作池,让每个工人去执行放在 rabbitmq 队列里的任务。当某个任务完成后(无论成功还是失败),我希望这个工人能去执行队列里的另一个任务。
我在文档里看到如何创建一组工人,并让他们都等待其他工人完成任务。但我想要的有点不同:我希望有一个 n 个任务的缓冲区,当一个工人完成任务后,它能从缓冲区里再添加一个任务(这样缓冲区里最多只有 n 个任务)。我在文档里找这个功能时遇到了一些困难。
为了让你更明白,我的非多线程代码是这样的:
while True:
message = get_frame_from_queue() # get message from rabbit mq
do_task(message.body) #body defines urls to download file
acknowledge_complete(message) # tell rabbitmq the message is acknowledged
在这个阶段,我的“多线程”实现会是这样的:
@recieves('ask_for_a_job')
def get_a_task():
# this function is executed when `ask_for_a_job` signal is fired
message = get_frame_from_queue()
do_task(message)
def do_tasks(task_info):
try:
# do stuff
finally:
# once the "worker" has finished start another.
fire_fignal('ask_for_a_job')
# start the "workers"
for i in range(5):
fire_fignal('ask_for_a_job')
我不想重新发明轮子。有没有更简单的方法可以实现这个功能呢?
注意,get_frame_from_queue
这个函数不是线程安全的。
2 个回答
0
一种解决方法是使用 multiprocessing.Pool
这个对象。你可以先用一个外层循环从 RabbitMQ 获取 N 个项目。然后把这些项目放入池中,等所有的任务都完成后,再逐个处理这些项目,确认每条消息已经处理完毕。最后,继续外层循环。
来源
import multiprocessing
def worker(word):
return bool(word=='whiskey')
messages = ['syrup', 'whiskey', 'bitters']
BATCHSIZE = 2
pool = multiprocessing.Pool(BATCHSIZE)
while messages:
# take first few messages, one per worker
batch,messages = messages[:BATCHSIZE],messages[BATCHSIZE:]
print 'BATCH:',
for res in pool.imap_unordered(worker, batch):
print res,
print
# TODO: acknowledge msgs in 'batch'
输出
BATCH: False True
BATCH: False
2
你可以让每个子进程或线程直接从队列中获取消息,然后在每个线程中,就像同步处理一样,从队列中处理这些消息。
from threading import Thread
def do_task(msg):
# Do stuff here
def consume():
while True:
message = get_frame_from_queue()
do_task(message.body)
acknowledge_complete(message)
if __name __ == "__main__":
threads = []
for i in range(5):
t = Thread(target=consume)
t.start()
threads.append(t)
这样一来,你就可以同时处理N条来自队列的消息,不需要线程之间进行任何信号传递。
唯一需要注意的就是你使用的rabbitmq库的线程安全性。根据它的实现方式,你可能需要为每个线程建立一个单独的连接,或者可能只需要一个连接,但每个线程使用一个通道等等。