Celery多进程配置未从Django接收任务

1 投票
1 回答
4368 浏览
提问于 2025-04-18 14:41

我用以下命令来启动我的工作进程:

celery -A myapp multi start 4 -l debug -Q1:3 queue1,queue2 -Q:4 queue3

工作进程启动得很好,所以当我运行

celery inspect active_queues

时,队列看起来是分配好的。

然后我从我的Django应用中启动任务,代码如下:

result = chain(task1.s(**kwargs).set(queue='queue1'),task2.s(**kwargs).set(queue='queue2'))()

我用result.parent解析结果变量,以获取所有任务的ID,并把它们记录到数据库中以便后续检查。当我发出

task = AsyncResult(task.id)
task.status

命令时,我得到每个我用链启动的任务的状态都是 'PENDING'。celery的日志似乎没有收到任何任务。不过,当我发出一个

celery purge

命令后,接着

yes

我收到消息说我的任务实际上已经从一个队列中移除,但从此之后被删除的任务的AsyncResult.status仍然显示为'PENDING',这些任务从未开始。我使用rabbitmq-server作为消息代理,所有配置都是默认的。我的celery配置也是默认的。这真的很奇怪,因为在另一个环境中,同样的代码和命令却产生了不同的结果:工作进程也启动了,但它们确实接收到了相同的任务并且顺利执行。请考虑一下这里可能出现的问题。

附言:当我用另一种方式启动工作进程时:

celery -A myapp worker -Q queue1,queue2,queue3 -l debug

我仍然无法让我的任务执行。这个问题在我修改链以启动任务并添加了

.set(queue='queue1')

或queue2或queue3后开始出现。

再附言:

我所有的任务都是用一个

@shared_task

装饰器

编写的。有没有办法查看哪些任务(我可以通过celery purge删除)在队列中等待,以及它们在等待哪个队列的名字?

1 个回答

1

Celery的默认设置应该能满足你的需求,所以可能的问题是你在某些选项上做了设置,导致你的队列没有反应。在这种情况下,可以考虑把这些选项注释掉(详细内容在文档中):

  • CELERY_QUEUES

  • CELERY_ROUTES

  • CELERY_DEFAULT_EXCHANGE

  • CELERY_DEFAULT_ROUTING_KEY

  • CELERY_DEFAULT_ROUTING_KEY

至于你的问题,我觉得这不是完整的答案,但你可以从RabbitMQ中列出所有活动的队列。

使用Celery时,可以参考文档中的内容:

celery -A proj inspect active

使用RabbitMQ时,可以参考文档中的内容:

rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate

撰写回答