如何设置Celery以调用自定义工作初始化?
我刚接触Celery,正在尝试设置一个项目,里面有两个独立的队列(一个用来计算,另一个用来执行)。到目前为止,一切都还不错。
我遇到的问题是,执行队列中的工作者需要用一个独特的对象ID来实例化一个类(每个工作者一个ID)。我在想,是否可以写一个自定义的工作者初始化方法,在开始时初始化这个对象,并在工作者被杀掉之前一直保存在内存中。
我在custom_task上找到了一个类似的问题,但提出的解决方案在我的情况下不适用。
考虑以下这个简单的例子:
celery.py
from celery import Celery
app = Celery('proj',
broker='amqp://guest@localhost//',
backend='amqp://',
include=['proj.tasks'])
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=60,
CELERY_ROUTES = {"proj.tasks.add1": {"queue": "q1"}},
)
if __name__ == '__main__':
app.start()
tasks.py
from proj.celery import app
from celery.signals import worker_init
@worker_init.connect(sender='worker1@hostname')
def configure_worker1(*args, **kwargs):
#SETUP id=1 for add1 here???
@worker_init.connect(sender='worker2@hostname')
def configure_worker2(*args, **kwargs):
#SETUP id=2 for add1 here???
@app.task
def add1(y):
return id + y
@app.task
def add(x, y):
return x + y
初始化:
celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info
这是正确的方法吗?如果是的话,我应该在tasks.py
中的configure_worker1
函数里写些什么,以便在工作者初始化时设置id
?
谢谢
1 个回答
8
我通过查看这个链接找到了答案:http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation
tasks.py 文件看起来是这样的:
from proj.celery import app
from celery import Task
class Task1(Task):
def __init__(self):
self._x = 1.0
class Task2(Task):
def __init__(self):
self._x = 2.0
@app.task(base=Task1)
def add1(y):
return add1._x + y
@app.task(base=Task2)
def add2(y):
return add2._x + y
初始化和之前一样:
celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info