如何获取任务运行的队列 - celery
我刚开始使用celery,有个问题想问。我的任务很简单:
@app.task(name='test_install_queue')
def test_install_queue():
return subprocess.call("exit 0",shell=True)
然后我在测试案例中这样调用这个任务:
result = tasks.test_default_queue.apply_async(queue="install")
这个任务在队列install
中成功运行(因为我在celery的日志中看到了它,并且完成得很好)。但是我想知道有没有一种编程的方法,可以从存储在result
中的对象中找到任务test_install_queue
是在哪个队列中运行的。
谢谢!
编辑:
我把任务改成了这样:
@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
return self.request.__dict__
然后我使用apply_async
的结果如下:
result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]
而解决方法是,工作进程(主机名)和在工作进程中初始化的唯一队列有相同的名字。
3 个回答
3
假设这个任务是一个绑定任务,你可以通过 self.request.delivery_info['routing_key']
来获取队列的名称。
6
我最近也遇到了这个问题,开始的时候我对“lexabug”提供的复杂解决方案持怀疑态度,因为我觉得没必要这么复杂。
而且连Celery的官方文档也没有给出有效的替代方案,所以我决定自己研究一下,利用反射来找出哪个对象包含我需要的信息,结果我找到了一个超级简单直接的解决办法。具体来说,我在Celery中写了一个钩子,或者说在Celery的术语中叫信号,这就是我根据任务名称获取队列名称的方法:
@signals.after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
# "sender" is a string containing task name
# ("celery" here is the celery app)
task: Task = celery.tasks.get(sender)
# once we have the task object, we can access the "queue" property
# which contains the name of the queue
# (it' a dynamic property so don't expect support by your IDE)
queue_name: str = task.queue if task is not None else 'unknown'
顺便说一下,我使用的是Celery 4.4版本。
8
你可以试试下面的方法:
delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
if queue.exchange.name == delivery_info['exchange']
and queue.routing_key == delivery_info['routing_key']:
original_queue = queue.name
break
这个方法是基于你使用默认的celery设置,并且你的交换方式是直接的假设。如果你需要一个更通用的解决方案来处理广播和主题交换,那么你就得查看每个声明的队列在 app.amqp.queues
中的路由键。