Python Pika-消费者线程

2024-04-25 14:02:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个Python应用程序,它有一个后台线程,用于消费RabbitMQ队列中的消息(主题场景)。

我在按钮的单击事件上启动线程。 这是我的代码,请注意“#self.receive_command()”。

def on_click_start_call(self,widget):


    t_msg = threading.Thread(target=self.receive_command)
    t_msg.start()
    t_msg.join(0)
    #self.receive_command()


def receive_command(self):

    syslog.syslog("ENTERED")

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    syslog.syslog("1")

    channel = connection.channel()
    syslog.syslog("2")

    channel.exchange_declare(exchange='STORE_CMD', type='topic')
    syslog.syslog("3")

    result = channel.queue_declare(exclusive=True)
    syslog.syslog("4")

    queue_name = result.method.queue
    syslog.syslog("5")

    def callback_rabbit(ch,method,properties,body):
        syslog.syslog("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")

    syslog.syslog("6")

    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
    syslog.syslog("7")

    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
    syslog.syslog("8")

    channel.start_consuming()

如果运行此代码,在syslog上看不到消息1、2、3、5、6、7、8,但只能看到“ENTERED”。所以,代码锁定在pika.BlokingConnection上。

如果我运行相同的代码(注释线程指令并取消对函数的直接调用),则所有操作都按要求进行,并且正确接收消息。

有什么解决方案可以让消费者进入线程?

提前谢谢

戴维德


Tags: 代码nameself消息exchangequeuedefchannel
2条回答

我用最新版本的Pika在我的机器上测试了代码。它工作得很好。Pika存在线程问题,但只要每个线程创建一个连接,就不应该是问题。

如果您遇到问题,很可能是因为旧版本Pika中的错误,或者与线程无关的问题导致了问题。

我建议您避免使用0.9.13,因为有多个bug,但是0.9.140.10.0应该很快就会发布™.

[编辑]Pika 0.9.14已经发布。

这是我使用的代码。

def receive_command():
    print("ENTERED")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    print("1")
    channel = connection.channel()
    print("2")
    channel.exchange_declare(exchange='STORE_CMD', type='topic')
    print("3")
    result = channel.queue_declare(exclusive=True)
    print("4")
    queue_name = result.method.queue
    print("5")
    def callback_rabbit(ch,method,properties,body):
        print("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")
    print("6")
    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
    print("7")
    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
    print("8")
    channel.start_consuming()

def start():
    t_msg = threading.Thread(target=receive_command)
    t_msg.start()
    t_msg.join(0)
    #self.receive_command()
start()

另一种方法是传递给线程方法channel.start_consuming作为目标,然后将回调传递给consume方法。 用法:consume(callback=your_method, queue=your_queue)

import threading

def consume(self, *args, **kwargs):
    if "channel" not in kwargs \
            or "callback" not in kwargs \
            or "queue" not in kwargs \
            or not callable(kwargs["callback"]):
        return None

    channel = kwargs["channel"]
    callback = kwargs["callback"]
    queue = kwargs["queue"]
    channel.basic_consume(callback, queue=queue, no_ack=True)

    t1 = threading.Thread(target=channel.start_consuming)
    t1.start()
    t1.join(0)

相关问题 更多 >

    热门问题