使用Python、Pika和AMQP设计异步RPC应用的最佳模式是什么?
我的应用程序的生产者模块是由希望在一个小集群上提交工作的用户运行的。它通过RabbitMQ消息中介以JSON格式发送订阅信息。
我尝试了几种策略,目前最好的方法是这样的,但仍然没有完全成功:
每台集群机器上都运行一个消费者模块,它会订阅AMQP队列,并设置一个prefetch_count,告诉中介它可以同时处理多少个任务。
我使用Pika AMQP库中的SelectConnection让它工作。生产者和消费者各自启动两个通道,一个连接到每个队列。生产者在通道[A]上发送请求,并在通道[B]上等待响应,而消费者在通道[A]上等待请求,并在通道[B]上发送响应。然而,似乎当消费者运行计算响应的回调时,它会被阻塞,所以每次每个消费者只能执行一个任务。
我最终需要的是:
- 消费者[A]将他的任务(每次大约5000个)提交给集群
- 中介为每个消费者分发N条消息/请求,其中N是它可以处理的并发任务数量
- 当一个任务完成后,消费者会将结果回复给中介/生产者
- 生产者收到回复后,更新计算状态,最后打印一些报告
限制条件:
- 如果另一个用户提交工作,他的所有任务都会排在前一个用户之后(我想这在队列系统中是自动实现的,但我还没有考虑在多线程环境下的影响)
- 任务有提交顺序,但回复的顺序并不重要
更新
我进一步研究了一下,发现我实际的问题似乎是我在pika的SelectConnection.channel.basic_consume()函数中使用了一个简单的函数作为回调。我的最后一个(尚未实现的)想法是传递一个线程函数,而不是一个普通函数,这样回调就不会阻塞,消费者可以继续监听。
3 个回答
由于我对线程的了解不多,我的设置会运行多个消费者进程(数量基本上就是你设置的预取数量)。每个进程都会连接到两个队列,并且它们会愉快地处理任务,彼此之间并不知道对方的存在。
你的设置听起来不错。没错,你可以简单地设置一个回调函数来启动一个线程,然后在这个线程完成后再调用另一个回调,把结果通过B通道发回去。
简单来说,你的消费者应该有自己的队列(大小为N,也就是它们能同时处理的请求数量)。当通过A通道收到请求时,它应该把结果存储在一个主线程和线程池中的工作线程共享的队列里。一旦结果入队,pika就会回复一个确认信息(ACK),然后你的工作线程就会被唤醒,开始处理这个请求。
当工作线程完成任务后,它会把结果放回一个单独的结果队列,并发出一个回调给主线程,让主线程把结果发回给消费者。
你需要注意确保工作线程之间不会互相干扰,特别是当它们使用共享资源的时候,不过这又是另一个话题了。
正如你所注意到的,当你的程序运行一个回调函数时,它会被阻塞。根据你的回调函数的具体功能,有几种方法可以解决这个问题。
如果你的回调函数是IO密集型的(也就是需要进行很多网络或磁盘操作),你可以使用线程或者基于greenlet的解决方案,比如gevent、eventlet,或者greenhouse。不过要记住,Python有个叫做GIL(全局解释器锁)的限制,这意味着在一个Python进程中,只有一段Python代码可以同时运行。这就意味着如果你在用Python做大量计算,这些解决方案可能不会比你现在的方式快多少。
另一个选择是使用multiprocessing来实现多个进程的消费者。我发现多进程在并行工作时非常有用。你可以通过使用一个队列来实现,让父进程作为消费者,把工作分配给子进程,或者简单地启动多个进程,每个进程各自消费。我建议,除非你的应用需要处理非常高的并发(成千上万个工作者),否则可以简单地启动多个工作者,每个工作者从自己的连接中消费。这样,你可以利用AMQP的确认功能,如果一个消费者在处理任务时崩溃,消息会自动返回到队列中,并会被其他工作者接手,而不是直接丢失请求。
最后一个选择是,如果你控制生产者并且它也是用Python写的,可以使用像celery这样的任务库来帮你处理任务和队列的工作。我在几个大型项目中使用过celery,发现它写得非常好。它还会根据适当的配置处理多个消费者的问题。