如何在celery任务中获取celery工作进程的名称?

14 投票
4 回答
13483 浏览
提问于 2025-04-18 07:32

我想要一个celery任务能够获取执行它的工作者的名字,这样我可以用来记录日志。我需要在任务内部处理这个,而不是直接查询消息代理。有没有办法做到这一点?我在使用celery和RabbitMQ,不知道这是否重要。

4 个回答

3

你最开始是想找你用 -n 这个标志输入的名字,对吧?这个名字在 initargs 数组里。下面是一个修改过的答案,可以帮你找到这个名字:

from celery import task
from billiard import current_process

@task
def getName():
    p = current_process()
    return p.initargs[1].split('@')[1]
7

我也需要工人的名字来做报告,所以我试了@cacois的解决方案,但在使用eventlet时似乎不太管用(current_process()没有initargs这个属性)。所以我把我的解决方案留在这里,方便以后参考:

from celery import task

@task(bind=True)
def getName(self):
    return self.request.hostname

这个属性的名字听起来有点奇怪,但它包含了在启动工人时用“-n”选项指定的名字。self的用法就像你在类方法中预期的那样,调用这个函数时不需要特别指定它(比如:getName.delay())。

7

你需要使用一个叫做billiard的东西,它用来管理工作者。

from celery import task
from billiard import current_process

@task
def getName():
    p = current_process()
    return p.index

然后在创建进程的时候,建立一个全局的字典,把ID和名字对应起来。

10

使用 celeryd_after_setup 这个信号可以获取工作者的名称,方法如下:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def capture_worker_name(sender, instance, **kwargs):
    os.environ["WORKER_NAME"] = '{0}'.format(sender)

撰写回答