在Python线程中消费RabbitMQ队列
这段内容有点长。
我有一份用户名和密码的列表。我想用这些账号登录并进行一些操作。为了加快速度,我想用几台机器来同时处理。我打算让一台主机器负责定时检查一个叫做rabbitmq的队列是否为空。如果队列是空的,它就会从一个文件中读取用户名和密码,然后把这些信息放到rabbitmq队列里。接着,我会有几台机器订阅这个队列,它们的工作是接收用户名和密码,进行操作,确认完成后再处理下一个,直到队列空了,然后主机器再填充队列。到目前为止,我觉得我的思路是对的。
现在我遇到的问题是,我检查过每个用户的操作并不复杂,所以我可以让每台机器同时处理三个操作,使用Python的线程功能。实际上,我已经在一台机器上实现了这个功能:我把用户名和密码加载到一个Python的队列中,然后用三个线程来处理这个队列。现在我想做类似的事情,但不是从Python的队列中取数据,而是让每台机器的每个线程从rabbitmq队列中取数据。这就是我现在卡住的地方。为了测试,我开始使用rabbitmq的教程。
send.py:
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
message = ' '.join(sys.argv[1:])
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
connection.close()
worker.py
import time, pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print ' [x] received %r' % (body,)
time.sleep( body.count('.') )
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello', no_ack=False)
channel.start_consuming()
根据上面的内容,你可以运行两个worker.py,它们会订阅rabbitmq队列并按预期工作。
我在没有使用rabbitmq的情况下的线程代码大致是这样的:
runit.py
class Threaded_do_stuff(threading.Thread):
def __init__(self, user_queue):
threading.Thread.__init__(self)
self.user_queue = user_queue
def run(self):
while True:
login = self.user_queue.get()
do_stuff(user=login[0], pass=login[1])
self.user_queue.task_done()
user_queue = Queue.Queue()
for i in range(3):
td = Threaded_do_stuff(user_queue)
td.setDaemon(True)
td.start()
## fill up the queue
for user in list_users:
user_queue.put(user)
## go!
user_queue.join()
这个也按预期工作:你填充队列,然后有三个线程订阅它。现在我想做的事情是类似于runit.py,但不是使用Python的队列,而是使用worker.py,其中队列实际上是一个rabbitmq队列。
我尝试过的一个方法没有成功(我也不明白为什么):
rabbitmq_runit.py
import time, threading, pika
class Threaded_worker(threading.Thread):
def callback(self, ch, method, properties, body):
print ' [x] received %r' % (body,)
time.sleep( body.count('.') )
ch.basic_ack(delivery_tag = method.delivery_tag)
def __init__(self):
threading.Thread.__init__(self)
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='hello')
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.callback, queue='hello')
def run(self):
print 'start consuming'
self.channel.start_consuming()
for _ in range(3):
print 'launch thread'
td = Threaded_worker()
td.setDaemon(True)
td.start()
我本以为这会启动三个线程,每个线程都会被.start_consuming()阻塞,等待rabbitmq队列发送数据给它们。结果是,这个程序启动后,打印了一些内容,然后就退出了。退出的模式也很奇怪:
launch thread
launch thread
start consuming
launch thread
start consuming
特别注意,有一个“开始消费”的步骤缺失了。
这是怎么回事?
补充:我找到的一个类似问题的答案在这里:使用多个线程消费rabbitmq消息队列(Python Kombu),答案是“使用celery”,这我不太理解。我觉得我不需要像celery那样复杂的东西。特别是,我并不是在设置RPC,也不需要从do_stuff的例程中读取回复。
补充2:我预期的打印模式应该是这样的。我执行:
python send.py first message......
python send.py second message.
python send.py third message.
python send.py fourth message.
然后打印的模式应该是:
launch thread
start consuming
[x] received 'first message......'
launch thread
start consuming
[x] received 'second message.'
launch thread
start consuming
[x] received 'third message.'
[x] received 'fourth message.'
1 个回答
问题在于你把线程设置成了守护线程:
td = Threaded_worker()
td.setDaemon(True) # Shouldn't do that.
td.start()
守护线程会在主线程退出时立刻被终止:
线程可以被标记为“守护线程”。这个标记的意思是,当只剩下守护线程时,整个Python程序就会退出。这个标记的初始值是从创建线程那里继承来的。你可以通过daemon属性来设置这个标记。
如果不使用 setDaemon(True)
,你应该能看到它按你预期的方式运行。
另外,pika的常见问题中有提到如何在使用线程时使用它:
Pika的代码中没有线程的概念。如果你想在使用线程时使用Pika,确保每个线程都有一个Pika连接,并且是在那个线程中创建的。跨线程共享一个Pika连接是不安全的。
这意味着你应该把在 __init__()
中做的所有事情都移到 run()
中,这样连接就会在你实际从队列中消费的同一个线程中创建。