RabbitMQ/Pika保证按照创建的顺序接收消息?

2024-04-24 22:29:02 发布

您现在位置:Python中文网/ 问答频道 /正文

作为一个简单的示例,我将向一个新的RabbitMQ(v2.6.1)队列添加5个项目:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
# add 5 messages to the queue, the numbers 1-5
for x in range(5):
    message = x+1
    channel.basic_publish(exchange='',routing_key='dw.neil', body=str(message))
    print " [x] Sent '%s'" % message
connection.close()

我清除我的队列,然后运行以上代码添加5项:

^{pr2}$

现在,我试图模拟失败的处理。给定要从队列中使用的以下代码。请注意,我的basic_ack调用被注释掉了:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
method_frame, header_frame, body=channel.basic_get(queue='dw.neil')
print method_frame, header_frame
print "body: %s" % body
#channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection.close()

我运行接收代码从队列中获取一个项目。正如我所料,我得到了第1项:

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

自从打电话给基本频道确认()被注释掉,我希望将未确认的消息放在队列中,以便下一个使用者获得它。我希望消息1是队列外的第一条消息,Redelivered属性设置为True。而是收到消息#2:

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 2

并且队列中的所有其他消息都会在“1”返回并将Redelivered flag设置为True之前接收到:

。。。在

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 5

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=True', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

是否有任何属性或选项可供我设置,以便在确认前一直获得交付?在

我的用例是用顺序生成的文件加载数据仓库。我们使用基于消息的处理来让我的程序知道一些新文件已经准备好并将被加载到DW中。我们必须按照文件生成的顺序来处理它们。在


Tags: true消息messageexchange队列queuetagchannel
2条回答

在rabbitmq2.7.0中已经解决了这个问题,我们运行的是2.6.1。在

release notes

New features in this release include:

  • order preserved of messages re-queued for a consumer

尝试使用channel.basic_拒绝-这将把未确认的消息推回到RabbitMQ,RabbitMQ会将该消息视为新消息。如果你有一个失败的消息卡住了,你可以使用channel.basic_恢复告诉RabbitMQ重新传递所有未确认的消息。在

http://www.rabbitmq.com/extensions.html#negative-acknowledgements提供有关基本。拒绝与基本。Nack. 在

消息排序语义在http://www.rabbitmq.com/semantics.html中解释

相关问题 更多 >