为什么RabbitMQ在持久队列上不持久化消息?
我正在通过Celery在Django中使用RabbitMQ。我使用的是最基本的设置:
# RabbitMQ connection settings
BROKER_HOST = 'localhost'
BROKER_PORT = '5672'
BROKER_USER = 'guest'
BROKER_PASSWORD = 'guest'
BROKER_VHOST = '/'
我导入了一个Celery任务,并把它排队在一年后执行。在iPython命令行中:
In [1]: from apps.test_app.tasks import add
In [2]: dt=datetime.datetime(2012, 2, 18, 10, 00)
In [3]: add.apply_async((10, 6), eta=dt)
DEBUG:amqplib:Start from server, version: 8.0, properties: {u'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', u'product': 'RabbitMQ', u'version': '2.2.0', u'copyright': 'Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.', u'platform': 'Erlang/OTP'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: ['en_US']
DEBUG:amqplib:Open OK! known_hosts []
DEBUG:amqplib:using channel_id: 1
DEBUG:amqplib:Channel open
DEBUG:amqplib:Closed channel #1
Out[3]: <AsyncResult: cfc507a1-175f-438e-acea-8c989a120ab3>
RabbitMQ在celery队列中收到了这个消息:
$ rabbitmqctl list_queues name messages durable
Listing queues ...
KTMacBook.local.celeryd.pidbox 0 false
celery 1 true
celeryctl_KTMacBook.local 0 true
...done.
然后我通过按下控制键+C,再按'a'来终止RabbitMQ。当我再次启动服务器并用rabbitmqctl检查时,它显示celery队列中没有消息:
$ rabbitmqctl list_queues name messages durable
Listing queues ...
celery 0 true
celeryctl_KTMacBook.local 0 true
...done.
celery队列是持久化的。为什么消息没有被保存呢?我需要做些什么才能让消息持久化?
2 个回答
23
让队列变得耐用,并不等于让队列里的消息变得持久。耐用的队列意味着当服务器重启后,它们会自动恢复,这在你的情况中显然是发生了。但这并不影响消息本身。
要让消息持久,你还需要把消息的 delivery_mode
属性设置为 2。想了解更多,可以看看经典的文章 《兔子与兔窝》。
补充:完整链接已失效,但截至2013年12月,你仍然可以通过主网址找到这篇博客文章: http://blogs.digitar.com/jjww/
6
要查看消息的 delivery_mode
,你可以消费(使用)这些消息,然后查看消息的属性。
>>> from tasks import add
>>> add.delay(2, 2)
>>> from celery import current_app
>>> conn = current_app.broker_connection()
>>> consumer = current_app.amqp.get_task_consumer(conn)
>>> messages = []
>>> def callback(body, message):
... messages.append(message)
>>> consumer.register_callback(callback)
>>> consumer.consume()
>>> conn.drain_events(timeout=1)
>>> messages[0].properties
>>> messages[0].properties
{'application_headers': {}, 'delivery_mode': 2, 'content_encoding': u'binary', 'content_type': u'application/x-python-serialize'}