单机多个工作进程处理不同任务

2 投票
1 回答
527 浏览
提问于 2025-04-17 17:58

我正在尝试用celery构建一个分布式任务执行系统。

当我在一台机器(本地)上启动两个工作进程时,一个负责加法任务add,另一个负责减法任务sub。然后我用add.delay()来启动几个加法任务,这时在减法工作进程的终端出现了一个错误:

[2013-03-05 15:51:18,898: ERROR/MainProcess] 收到未注册的任务类型 'add_tasks.add'。

在这个测试中,我启动了两个加法任务:一个被加法工作进程接收,另一个却被减法工作进程接收,这就导致了上面的错误。我该如何更改配置,以确保第二个加法任务不会被减法工作进程接收呢?谢谢。

以下是代码:

add_tasks.py:

celery = Celery('add_tasks', backend='amqp', broker='amqp://guest@localhost//')

@celery.task
def add(x, y):
    sleep(20)
    return x + y

sub_tasks.py:

celery = Celery('sub_tasks', backend='amqp', broker='amqp://guest@localhost//')

@celery.task
def sub(x, y):
    sleep(10)
    return x - y

我通过在本地机器的两个终端中运行celery -A add_tasks worker --loglevel=info -n worker1celery -A sub_tasks worker --loglevel=info -n worker2来启动工作进程。

1 个回答

4

最后我发现ROUTER这个功能可以解决我的问题。我把我的解决方案放在这里,希望对遇到同样问题的人有帮助。

在启动一个工作进程时,我们可以使用-Q queue这个选项来限制工作进程只接受来自queue的任务。在我的情况下,我使用了celery -A add_tasks worker --loglevel=info -n worker1 -Q addition

另一方面,当我们要启动一个新任务时,应该明确指定队列参数,比如add.apply_async(queue='addition',priority=0,args=[1,4])sub.apply_async(queue='subtraction',priority=0,args=[1,4])。这样,添加任务就不会被减法工作进程接受。

撰写回答