使用RPC后端获取芹菜任务结果

2024-04-26 04:36:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在努力从芹菜任务中获得结果。 我的应用程序入口点如下所示:

from app import create_app,celery
celery.conf.task_default_queue = 'order_master'
order_app = create_app('../config.order_master.py')

现在,在启动应用程序之前,我先启动RabbitMQ并确保它没有队列:

root@3d2e6b124780:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
root@3d2e6b124780:/#

现在我启动应用程序。启动后,我仍然没有看到RabbitMQ中的队列。当我从应用程序jobs.add_together.delay(2, 3)启动任务时,我得到任务ID:

ralfeus@web-2 /v/w/order (multiple-instances)> (order) curl localhost/test
{"result":"a2c07de4-f9f2-4b21-ae47-c6d92f2a7dfe"}
ralfeus@web-2 /v/w/order (multiple-instances)> (order)

此时,我可以看到我的队列中有一条消息:

root@3d2e6b124780:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
dd65ba89-cce9-3e0b-8252-c2216912a910    0
order_master   1
root@3d2e6b124780:/# 

现在我开始做芹菜工人:

ralfeus@web-2 /v/w/order (multiple-instances)>
/usr/virtualfish/order/bin/celery -A main_order_master:celery worker --loglevel=INFO -n order_master -Q order_master --concurrency 2
INFO:app:Blueprints are registered

 -------------- celery@order_master v5.0.0 (singularity)
--- ***** -----
-- ******* ---- Linux-5.4.0-51-generic-x86_64-with-glibc2.29 2020-10-22 16:38:56
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         app:0x7f374715c5b0
- ** ---------- .> transport:   amqp://guest:**@172.17.0.1:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> order_master            exchange=order_master(direct) key=order_master


[tasks]
  . app.jobs.add_together
  . app.jobs.post_purchase_orders

[2020-10-22 16:38:57,263: INFO/MainProcess] Connected to amqp://guest:**@172.17.0.1:5672//
[2020-10-22 16:38:57,304: INFO/MainProcess] mingle: searching for neighbors
[2020-10-22 16:38:58,354: INFO/MainProcess] mingle: all alone
[2020-10-22 16:38:58,375: INFO/MainProcess] celery@order_master ready.
[2020-10-22 16:38:58,377: INFO/MainProcess] Received task: app.jobs.add_together[f855bec7-307d-4570-ab04-3d036005a87b]
[2020-10-22 16:40:38,616: INFO/ForkPoolWorker-2] Task app.jobs.add_together[f855bec7-307d-4570-ab04-3d036005a87b] succeeded in 100.13561034202576s: 5

因此,工人可以拿起任务并执行它并产生结果。但是我不能得到结果。相反,当我请求结果时,我得到以下结果:

curl localhost/test/f855bec7-307d-4570-ab04-3d036005a87b
{"state":"PENDING"}
ralfeus@web-2 /v/w/order (multiple-instance)> (order)

如果我现在检查队列,我会看到:

root@3d2e6b124780:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
dd65ba89-cce9-3e0b-8252-c2216912a910    1
65d80661-6195-3986-9fa2-e468eaab656e    0
celeryev.9ca5a092-9a0c-4bd5-935b-f5690cf9665b   0
order_master   0
celery@order_master.celery.pidbox      0
root@3d2e6b124780:/#

我看到队列dd65ba89-cce9-3e0b-8252-c2216912a910有一条消息,当我检查时,它包含结果。那么,为什么它会出现在那里,我如何得到它呢?所有手册都说我只需要按ID获取任务。但在我的情况下,任务仍然处于挂起状态


Tags: infomasteraddapp应用程序for队列jobs
1条回答
网友
1楼 · 发布于 2024-04-26 04:36:40

根据芹菜文献:

RPC Result Backend (RabbitMQ/QPid)

The RPC result backend (rpc://) is special as it doesn’t actually store the states, but rather sends them as messages. This is an important difference as it means that a result can only be retrieved once, and only by the client that initiated the task. Two different processes can’t wait for the same result.

因此,使用rpc://不适合稍后通过另一个请求检索结果

相关问题 更多 >