如何获取任务运行的队列 - celery

5 投票
3 回答
8767 浏览
提问于 2025-04-17 22:09

我刚开始使用celery,有个问题想问。我的任务很简单:

@app.task(name='test_install_queue')
def test_install_queue():
    return subprocess.call("exit 0",shell=True)

然后我在测试案例中这样调用这个任务:

result = tasks.test_default_queue.apply_async(queue="install")

这个任务在队列install中成功运行(因为我在celery的日志中看到了它,并且完成得很好)。但是我想知道有没有一种编程的方法,可以从存储在result中的对象中找到任务test_install_queue是在哪个队列中运行的。

谢谢!

编辑:

我把任务改成了这样:

@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
    return self.request.__dict__

然后我使用apply_async的结果如下:

result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]

而解决方法是,工作进程(主机名)和在工作进程中初始化的唯一队列有相同的名字。

3 个回答

3

假设这个任务是一个绑定任务,你可以通过 self.request.delivery_info['routing_key'] 来获取队列的名称。

6

我最近也遇到了这个问题,开始的时候我对“lexabug”提供的复杂解决方案持怀疑态度,因为我觉得没必要这么复杂。
而且连Celery的官方文档也没有给出有效的替代方案,所以我决定自己研究一下,利用反射来找出哪个对象包含我需要的信息,结果我找到了一个超级简单直接的解决办法。具体来说,我在Celery中写了一个钩子,或者说在Celery的术语中叫信号,这就是我根据任务名称获取队列名称的方法:

    @signals.after_task_publish.connect()
    def on_task_publish(sender=None, headers=None, body=None, **kwargs):

        # "sender" is a string containing task name 
        # ("celery" here is the celery app)
        task: Task = celery.tasks.get(sender)

        # once we have the task object, we can access the "queue" property 
        # which contains the name of the queue 
        # (it' a dynamic property so don't expect support by your IDE)
        queue_name: str = task.queue if task is not None else 'unknown'

顺便说一下,我使用的是Celery 4.4版本。

8

你可以试试下面的方法:

delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
    if queue.exchange.name == delivery_info['exchange'] 
        and queue.routing_key == delivery_info['routing_key']:
            original_queue = queue.name
            break

这个方法是基于你使用默认的celery设置,并且你的交换方式是直接的假设。如果你需要一个更通用的解决方案来处理广播和主题交换,那么你就得查看每个声明的队列在 app.amqp.queues 中的路由键。

撰写回答