Celery与路由

3 投票
3 回答
4445 浏览
提问于 2025-04-17 03:42

我需要在特定的celeryd实例上运行一些任务。所以我配置了队列:

celeryconfig.py:

CELERY_QUEUES = {
    'celery': {
        'exchange': 'celery',
        'binding_key': 'celery',
    },
    'import': {
        'exchange': 'import',
        'binding_key': 'import.products',
    },
}

CELERY_ROUTES = {
    'celery_tasks.import_tasks.test': {
        'queue': 'import',
        'routing_key': 'import.products',
    },
}

import_tasks.py:

@task
def test():
    print 'test'

@task(exchange='import', routing_key='import.products')
def test2
    print 'test2'

然后我启动celeryd:

celeryd -c 2 -l INFO -Q import

我尝试执行这些任务。'test'可以执行,但'test2'却不行。不过我不想在CELERY_ROUTES里每次都指定每个导入的任务。我该如何在任务定义中指定哪个队列应该执行任务呢?

3 个回答

0

我找到了一种解决办法,差不多能满足我的需求:

class CustomRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task.startswith('celery_tasks.import_tasks'):
            return {'exchange': 'import',
                    'routing_key': 'import.products'}

CELERY_ROUTES = (
    CustomRouter(),
)

不过现在的问题是,我不能给任务起名字了。

1

看看Roman的解决方案 -- http://www.imankulov.name/posts/celery-for-internal-api.html -- 这个方法可以通过任务名称来访问任务,同时还能指定队列等设置,就像你直接导入任务模块一样。

1

哦,我忘了说我用的是send_task这个函数来执行任务。这个函数并不会导入任务,它只是把任务的名字发送到队列里。

所以我没有这样做:

from celery.execute import send_task

result = send_task(args.task, task_args, task_kwargs)

我写的是:

from celery import current_app as celery_app, registry as celery_registry

celery_imports = celery_app.conf.get('CELERY_IMPORTS')
if celery_imports:
    for module in celery_imports:
        __import__(module)

task = celery_registry.tasks.get(args.task)
if task:
    result = task.apply_async(task_args, task_kwargs)

撰写回答