如何设置Celery以调用自定义工作初始化?

7 投票
1 回答
14040 浏览
提问于 2025-05-01 04:56

我刚接触Celery,正在尝试设置一个项目,里面有两个独立的队列(一个用来计算,另一个用来执行)。到目前为止,一切都还不错。

我遇到的问题是,执行队列中的工作者需要用一个独特的对象ID来实例化一个类(每个工作者一个ID)。我在想,是否可以写一个自定义的工作者初始化方法,在开始时初始化这个对象,并在工作者被杀掉之前一直保存在内存中。

我在custom_task上找到了一个类似的问题,但提出的解决方案在我的情况下不适用。

考虑以下这个简单的例子:

celery.py

from celery import Celery

app = Celery('proj',
             broker='amqp://guest@localhost//',
             backend='amqp://',
             include=['proj.tasks'])

app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=60,
    CELERY_ROUTES = {"proj.tasks.add1": {"queue": "q1"}},
)

if __name__ == '__main__':
    app.start()

tasks.py

from proj.celery import app
from celery.signals import worker_init

@worker_init.connect(sender='worker1@hostname')
def configure_worker1(*args, **kwargs):
    #SETUP id=1 for add1 here???

@worker_init.connect(sender='worker2@hostname')
def configure_worker2(*args, **kwargs):
    #SETUP id=2 for add1 here???

@app.task
def add1(y):
    return id + y

@app.task
def add(x, y):
    return x + y

初始化:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info

这是正确的方法吗?如果是的话,我应该在tasks.py中的configure_worker1函数里写些什么,以便在工作者初始化时设置id

谢谢

暂无标签

1 个回答

8

我通过查看这个链接找到了答案:http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation

tasks.py 文件看起来是这样的:

from proj.celery import app
from celery import Task

class Task1(Task):
    def __init__(self):
        self._x = 1.0

class Task2(Task):
    def __init__(self):
        self._x = 2.0

@app.task(base=Task1)
def add1(y):
    return add1._x + y

@app.task(base=Task2)
def add2(y):
    return add2._x + y

初始化和之前一样:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info

撰写回答