Celery与路由
我需要在特定的celeryd实例上运行一些任务。所以我配置了队列:
celeryconfig.py:
CELERY_QUEUES = {
'celery': {
'exchange': 'celery',
'binding_key': 'celery',
},
'import': {
'exchange': 'import',
'binding_key': 'import.products',
},
}
CELERY_ROUTES = {
'celery_tasks.import_tasks.test': {
'queue': 'import',
'routing_key': 'import.products',
},
}
import_tasks.py:
@task
def test():
print 'test'
@task(exchange='import', routing_key='import.products')
def test2
print 'test2'
然后我启动celeryd:
celeryd -c 2 -l INFO -Q import
我尝试执行这些任务。'test'可以执行,但'test2'却不行。不过我不想在CELERY_ROUTES里每次都指定每个导入的任务。我该如何在任务定义中指定哪个队列应该执行任务呢?
3 个回答
0
我找到了一种解决办法,差不多能满足我的需求:
class CustomRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task.startswith('celery_tasks.import_tasks'):
return {'exchange': 'import',
'routing_key': 'import.products'}
CELERY_ROUTES = (
CustomRouter(),
)
不过现在的问题是,我不能给任务起名字了。
1
看看Roman的解决方案 -- http://www.imankulov.name/posts/celery-for-internal-api.html -- 这个方法可以通过任务名称来访问任务,同时还能指定队列等设置,就像你直接导入任务模块一样。
1
哦,我忘了说我用的是send_task这个函数来执行任务。这个函数并不会导入任务,它只是把任务的名字发送到队列里。
所以我没有这样做:
from celery.execute import send_task
result = send_task(args.task, task_args, task_kwargs)
我写的是:
from celery import current_app as celery_app, registry as celery_registry
celery_imports = celery_app.conf.get('CELERY_IMPORTS')
if celery_imports:
for module in celery_imports:
__import__(module)
task = celery_registry.tasks.get(args.task)
if task:
result = task.apply_async(task_args, task_kwargs)