Celery多进程配置未从Django接收任务
我用以下命令来启动我的工作进程:
celery -A myapp multi start 4 -l debug -Q1:3 queue1,queue2 -Q:4 queue3
工作进程启动得很好,所以当我运行
celery inspect active_queues
时,队列看起来是分配好的。
然后我从我的Django应用中启动任务,代码如下:
result = chain(task1.s(**kwargs).set(queue='queue1'),task2.s(**kwargs).set(queue='queue2'))()
我用result.parent解析结果变量,以获取所有任务的ID,并把它们记录到数据库中以便后续检查。当我发出
task = AsyncResult(task.id)
task.status
命令时,我得到每个我用链启动的任务的状态都是 'PENDING'。celery的日志似乎没有收到任何任务。不过,当我发出一个
celery purge
命令后,接着
yes
我收到消息说我的任务实际上已经从一个队列中移除,但从此之后被删除的任务的AsyncResult.status仍然显示为'PENDING',这些任务从未开始。我使用rabbitmq-server作为消息代理,所有配置都是默认的。我的celery配置也是默认的。这真的很奇怪,因为在另一个环境中,同样的代码和命令却产生了不同的结果:工作进程也启动了,但它们确实接收到了相同的任务并且顺利执行。请考虑一下这里可能出现的问题。
附言:当我用另一种方式启动工作进程时:
celery -A myapp worker -Q queue1,queue2,queue3 -l debug
我仍然无法让我的任务执行。这个问题在我修改链以启动任务并添加了
.set(queue='queue1')
或queue2或queue3后开始出现。
再附言:
我所有的任务都是用一个
@shared_task
装饰器
编写的。有没有办法查看哪些任务(我可以通过celery purge删除)在队列中等待,以及它们在等待哪个队列的名字?
1 个回答
Celery的默认设置应该能满足你的需求,所以可能的问题是你在某些选项上做了设置,导致你的队列没有反应。在这种情况下,可以考虑把这些选项注释掉(详细内容在文档中):
CELERY_QUEUES
CELERY_ROUTES
CELERY_DEFAULT_EXCHANGE
CELERY_DEFAULT_ROUTING_KEY
CELERY_DEFAULT_ROUTING_KEY
至于你的问题,我觉得这不是完整的答案,但你可以从RabbitMQ中列出所有活动的队列。
使用Celery时,可以参考文档中的内容:
celery -A proj inspect active
使用RabbitMQ时,可以参考文档中的内容:
rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate