使用Python、Pika和AMQP设计异步RPC应用程序的最佳模式是什么?

2024-04-29 04:58:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我的应用程序的producer模块由希望提交要在小集群上完成的工作的用户运行。它通过RabbitMQ消息代理以JSON形式发送订阅。

我已经尝试了几种策略,到目前为止最好的策略是以下策略,但仍然没有完全奏效:

每个群集计算机都运行一个使用者模块,该模块将自己订阅到AMQP队列,并发出一个预取计数,告诉代理它一次可以运行多少个任务。

我可以使用Pika AMQP库中的SelectConnection使其工作。消费者和生产者都启动两个通道,每个通道连接一个队列。生产者在信道[A]上发送请求并等待信道[B]中的响应,消费者在信道[A]上等待请求并在信道[B]上发送响应。然而,当使用者运行计算响应的回调时,它似乎会阻塞,因此每次在每个使用者处只有一个任务执行。

最后我需要的是:

  1. 消费者[A]向集群订阅他的任务(每次约5k)
  2. 代理为每个使用者分派N条消息/请求,其中N是它可以处理的并发任务数
  3. 当一个任务完成时,消费者会将结果回复给代理/生产者
  4. 生产者接收回复,更新计算状态,最后打印一些报告

限制:

  • 如果另一个用户提交工作,那么他的所有任务都将在前一个用户之后排队(我猜这在队列系统中是自动实现的,但是我还没有考虑到线程环境的影响)
  • 任务有一个要提交的顺序,但它们被回复的顺序并不重要

更新

我已经做了进一步的研究,我的实际问题似乎是我使用一个简单的函数作为pika的SelectConnection.channel.basic_consume()函数的回调。我的最后一个(未实现的)想法是传递一个线程函数,而不是一个常规函数,这样回调就不会阻塞,消费者可以继续监听。


Tags: 模块函数用户消息amqp代理队列使用者