单机多个工作进程处理不同任务
我正在尝试用celery构建一个分布式任务执行系统。
当我在一台机器(本地)上启动两个工作进程时,一个负责加法任务add
,另一个负责减法任务sub
。然后我用add.delay()
来启动几个加法任务,这时在减法工作进程的终端出现了一个错误:
[2013-03-05 15:51:18,898: ERROR/MainProcess] 收到未注册的任务类型 'add_tasks.add'。
在这个测试中,我启动了两个加法任务:一个被加法工作进程接收,另一个却被减法工作进程接收,这就导致了上面的错误。我该如何更改配置,以确保第二个加法任务不会被减法工作进程接收呢?谢谢。
以下是代码:
add_tasks.py:
celery = Celery('add_tasks', backend='amqp', broker='amqp://guest@localhost//')
@celery.task
def add(x, y):
sleep(20)
return x + y
sub_tasks.py:
celery = Celery('sub_tasks', backend='amqp', broker='amqp://guest@localhost//')
@celery.task
def sub(x, y):
sleep(10)
return x - y
我通过在本地机器的两个终端中运行celery -A add_tasks worker --loglevel=info -n worker1
和celery -A sub_tasks worker --loglevel=info -n worker2
来启动工作进程。
1 个回答
最后我发现ROUTER
这个功能可以解决我的问题。我把我的解决方案放在这里,希望对遇到同样问题的人有帮助。
在启动一个工作进程时,我们可以使用-Q queue
这个选项来限制工作进程只接受来自queue
的任务。在我的情况下,我使用了celery -A add_tasks worker --loglevel=info -n worker1 -Q addition
。
另一方面,当我们要启动一个新任务时,应该明确指定队列参数,比如add.apply_async(queue='addition',priority=0,args=[1,4])
和sub.apply_async(queue='subtraction',priority=0,args=[1,4])
。这样,添加任务就不会被减法工作进程接受。