当prefetch_count == 1时拒绝并重新排队RabbitMQ任务

3 投票
1 回答
4584 浏览
提问于 2025-04-18 10:33

假设我有一个包含五个项目的队列:

(tail) E, D, C, B, A (head)

我从这个队列的头部消费消息,但我觉得消息 A 现在不适合处理。我用 reject 把这个项目拒绝掉,并设置 requeue=True,这样队列变成了:

(tail) A, E, D, C, B (head)

接着我消费了 BCDE,并对每一个都进行了确认(ack)。现在队列里只剩下 A,我不断地消费它并且一再拒绝,形成了一个无尽的循环。如果有新的非 A 的消息进来,它几乎会立刻被消费掉,然后我又会继续尝试消费 A

我在 Pika 文档中的 Twisted Consumer 示例 上做了一个小修改:

import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task


@defer.inlineCallbacks
def run(connection):

    channel = yield connection.channel()

    exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')

    queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)

    yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')

    #yield channel.basic_qos(prefetch_count=1)

    queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)

    l = task.LoopingCall(read, queue_object)

    l.start(0.01)


@defer.inlineCallbacks
def read(queue_object):

    ch,method,properties,body = yield queue_object.get()

    print body

    if body == 'A':
        yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
    else:
        yield ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()

问题: 注意以下被注释掉的那一行:

#yield channel.basic_qos(prefetch_count=1)

当我取消注释那一行时,消费者在处理消息 A 时,会在拒绝后立刻再次尝试消费它,完全忽略了队列中可能在后面等待的其他项目。它并没有把被拒绝的项目放到队列的尾部,而是不断地重复尝试,完全阻塞了队列中的其他消息。

如果那一行被注释掉,程序就能正常工作(虽然速度稍慢)。如果那一行存在,并且 prefetch_count > 1,它也能正常工作。设置为正好 1 时,会触发这种行为。

我在拒绝消息 A 时是不是漏掉了什么步骤?还是说 Pika 的预取系统根本就不适合这种边缘情况?

1 个回答

5

如果你只有一个消费者,那么RabbitMQ就只能把消息发给那个被拒绝的消费者(无论是用basic.reject还是basic.nack)。

当你设置prefetch_count > 1时,你的消费者会收到一个循环的消息,还有一个新的消息会排在这个循环消息的前面(也就是说,循环的消息会一直在最前面)。

如果你意外地收到了N*M个循环消息,并且prefetch_count <= N,消费者数量<= M,那么所有的消息都会变成循环状态(这会导致CPU过载等问题),所以最好检查一下rejected消息的标志,并在消息已经重新投递的情况下,使用一些更复杂的逻辑。

撰写回答