如何设置芹菜以调用自定义工作进程初始化?

2024-03-29 13:58:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我对芹菜很陌生,我一直试图建立一个有两个独立队列(一个用于计算,另一个用于执行)的项目。到目前为止,还不错。

我的问题是,执行队列中的工人需要实例化一个具有唯一对象id(每个工人一个id)的类。我想知道我是否可以编写一个自定义的工作程序初始化来在启动时初始化对象,并将其保存在内存中,直到工作程序被杀死。

我在custom_task上发现了一个类似的问题,但建议的解决方案在我的情况下不起作用。

考虑以下玩具示例:

芹菜.py

from celery import Celery

app = Celery('proj',
             broker='amqp://guest@localhost//',
             backend='amqp://',
             include=['proj.tasks'])

app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=60,
    CELERY_ROUTES = {"proj.tasks.add1": {"queue": "q1"}},
)

if __name__ == '__main__':
    app.start()

任务.py

from proj.celery import app
from celery.signals import worker_init

@worker_init.connect(sender='worker1@hostname')
def configure_worker1(*args, **kwargs):
    #SETUP id=1 for add1 here???

@worker_init.connect(sender='worker2@hostname')
def configure_worker2(*args, **kwargs):
    #SETUP id=2 for add1 here???

@app.task
def add1(y):
    return id + y

@app.task
def add(x, y):
    return x + y

正在初始化:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info

这是正确的方法吗?如果是,我应该在tasks.py中的configure_worker1函数中写入什么来设置工作机初始化时的id

谢谢


Tags: frompyimportidapptaskdefstart
1条回答
网友
1楼 · 发布于 2024-03-29 13:58:22

我根据这个http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation找到了答案

tasks.py如下所示:

from proj.celery import app
from celery import Task

class Task1(Task):
    def __init__(self):
        self._x = 1.0

class Task2(Task):
    def __init__(self):
        self._x = 2.0

@app.task(base=Task1)
def add1(y):
    return add1._x + y

@app.task(base=Task2)
def add2(y):
    return add2._x + y

像以前一样初始化:

celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info

相关问题 更多 >