我的应用程序的producer模块由希望提交要在小集群上完成的工作的用户运行。它通过RabbitMQ消息代理以JSON形式发送订阅。
我已经尝试了几种策略,到目前为止最好的策略是以下策略,但仍然没有完全奏效:
每个群集计算机都运行一个使用者模块,该模块将自己订阅到AMQP队列,并发出一个预取计数,告诉代理它一次可以运行多少个任务。
我可以使用Pika AMQP库中的SelectConnection使其工作。消费者和生产者都启动两个通道,每个通道连接一个队列。生产者在信道[A]上发送请求并等待信道[B]中的响应,消费者在信道[A]上等待请求并在信道[B]上发送响应。然而,当使用者运行计算响应的回调时,它似乎会阻塞,因此每次在每个使用者处只有一个任务执行。
最后我需要的是:
限制:
更新
我已经做了进一步的研究,我的实际问题似乎是我使用一个简单的函数作为pika的SelectConnection.channel.basic_consume()函数的回调。我的最后一个(未实现的)想法是传递一个线程函数,而不是一个常规函数,这样回调就不会阻塞,消费者可以继续监听。
目前没有回答
相关问题 更多 >
编程相关推荐