在Python线程中消费RabbitMQ队列

12 投票
1 回答
16664 浏览
提问于 2025-04-18 18:27

这段内容有点长。

我有一份用户名和密码的列表。我想用这些账号登录并进行一些操作。为了加快速度,我想用几台机器来同时处理。我打算让一台主机器负责定时检查一个叫做rabbitmq的队列是否为空。如果队列是空的,它就会从一个文件中读取用户名和密码,然后把这些信息放到rabbitmq队列里。接着,我会有几台机器订阅这个队列,它们的工作是接收用户名和密码,进行操作,确认完成后再处理下一个,直到队列空了,然后主机器再填充队列。到目前为止,我觉得我的思路是对的。

现在我遇到的问题是,我检查过每个用户的操作并不复杂,所以我可以让每台机器同时处理三个操作,使用Python的线程功能。实际上,我已经在一台机器上实现了这个功能:我把用户名和密码加载到一个Python的队列中,然后用三个线程来处理这个队列。现在我想做类似的事情,但不是从Python的队列中取数据,而是让每台机器的每个线程从rabbitmq队列中取数据。这就是我现在卡住的地方。为了测试,我开始使用rabbitmq的教程。

send.py:

import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

message = ' '.join(sys.argv[1:])
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
connection.close()

worker.py

import time, pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print ' [x] received %r' % (body,)
    time.sleep( body.count('.') )
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello', no_ack=False)
channel.start_consuming()

根据上面的内容,你可以运行两个worker.py,它们会订阅rabbitmq队列并按预期工作。

我在没有使用rabbitmq的情况下的线程代码大致是这样的:

runit.py

class Threaded_do_stuff(threading.Thread):
    def __init__(self, user_queue):
        threading.Thread.__init__(self)
        self.user_queue = user_queue

    def run(self):
        while True:
            login = self.user_queue.get()
            do_stuff(user=login[0], pass=login[1])
            self.user_queue.task_done()

user_queue = Queue.Queue()
for i in range(3):
    td = Threaded_do_stuff(user_queue)
    td.setDaemon(True)
    td.start()

## fill up the queue
for user in list_users:
    user_queue.put(user)

## go!
user_queue.join()

这个也按预期工作:你填充队列,然后有三个线程订阅它。现在我想做的事情是类似于runit.py,但不是使用Python的队列,而是使用worker.py,其中队列实际上是一个rabbitmq队列。

我尝试过的一个方法没有成功(我也不明白为什么):

rabbitmq_runit.py

import time, threading, pika

class Threaded_worker(threading.Thread):
    def callback(self, ch, method, properties, body):
        print ' [x] received %r' % (body,)
        time.sleep( body.count('.') )
        ch.basic_ack(delivery_tag = method.delivery_tag)

    def __init__(self):
        threading.Thread.__init__(self)
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='hello')
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(self.callback, queue='hello')

    def run(self):
        print 'start consuming'
        self.channel.start_consuming()

for _ in range(3):
    print 'launch thread'
    td = Threaded_worker()
    td.setDaemon(True)
    td.start()

我本以为这会启动三个线程,每个线程都会被.start_consuming()阻塞,等待rabbitmq队列发送数据给它们。结果是,这个程序启动后,打印了一些内容,然后就退出了。退出的模式也很奇怪:

launch thread
launch thread
start consuming
launch thread
start consuming

特别注意,有一个“开始消费”的步骤缺失了。

这是怎么回事?

补充:我找到的一个类似问题的答案在这里:使用多个线程消费rabbitmq消息队列(Python Kombu),答案是“使用celery”,这我不太理解。我觉得我不需要像celery那样复杂的东西。特别是,我并不是在设置RPC,也不需要从do_stuff的例程中读取回复。

补充2:我预期的打印模式应该是这样的。我执行:

python send.py first message......
python send.py second message.
python send.py third message.
python send.py fourth message.

然后打印的模式应该是:

launch thread
start consuming
 [x] received 'first message......'
launch thread
start consuming
 [x] received 'second message.'
launch thread
start consuming
 [x] received 'third message.'
 [x] received 'fourth message.'

1 个回答

18

问题在于你把线程设置成了守护线程:

td = Threaded_worker()
td.setDaemon(True)  # Shouldn't do that.
td.start()

守护线程会在主线程退出时立刻被终止:

线程可以被标记为“守护线程”。这个标记的意思是,当只剩下守护线程时,整个Python程序就会退出。这个标记的初始值是从创建线程那里继承来的。你可以通过daemon属性来设置这个标记。

如果不使用 setDaemon(True),你应该能看到它按你预期的方式运行。

另外,pika的常见问题中有提到如何在使用线程时使用它:

Pika的代码中没有线程的概念。如果你想在使用线程时使用Pika,确保每个线程都有一个Pika连接,并且是在那个线程中创建的。跨线程共享一个Pika连接是不安全的。

这意味着你应该把在 __init__() 中做的所有事情都移到 run() 中,这样连接就会在你实际从队列中消费的同一个线程中创建。

撰写回答