芹菜:接收到类型为<AsyncResult:[hash]>

2024-04-19 20:51:22 发布

您现在位置:Python中文网/ 问答频道 /正文

在我在stackOverflow上看到的所有类似问题中:

错误说明未注册的任务的名称。我有一个不同的问题。不显示任务的名称,而是Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>,结果是KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>。在

这是我的回溯:

KeyError: <AsyncResult: 4aca05f8-14c6-4a25-988a-ff605a27871d>
[2016-06-15 14:11:46,016: ERROR/MainProcess] Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?

The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>, 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': 'a6e8d1c0-c75b-471e-b21f-af8492592aeb', 'kwargs': {}, 'eta': None, 'id': '0dffed5f-3090-417c-a9ec-c99e11bc9579'} (568b)
Traceback (most recent call last):
File "/Users/me/Developer/virtualenvironments/project_name/lib/python2.7/site-packages/celery/worker/consumer.py", line 456, in on_task_received
strategies[name](message, body,
KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>

我的celery应用程序包含文件,其中我只有3个任务:

app/芹菜_应用程序副本公司名称:

^{pr2}$

应用程序/任务/资产.py公司名称:

from __future__ import absolute_import
from celery import current_app

@current_app.task(name='app.tasks.assets.list_assets')
def list_assets(*args, **kwargs):
    print "list assets"

@current_app.task(name='app.tasks.assets.massage_assets')
def massage_assets(assets):
    print "massaging assets"

@current_app.task(name='app.tasks.assets.save_assets', ignore_result=True)
def save_assets(assets):
    print "saving assets..."

这些错误只发生在队列“芹菜”(我不使用)和“本地测试”中。在

所有这些任务的适当队列都会打印出来并按预期工作,但不知何故,名为“celery”和“local_testing”的队列正在被填满(相同的队列大小),并且除了一次又一次的回溯之外,什么都没有输出。在

我是这样称呼这些任务的。。。在

应用程序/流程/进程.py公司名称:

from celery import group
class Process(object):
    def run_process(self, resource_generator, chain_signature):
        tasks = []
        for resources in resource_generator:
            tasks.append(chain_signature(resources))
        group(tasks)()

应用程序/流程/资产.py公司名称:

from __future__ import absolute_import

from app.processes.processes import Process
from app.indexes.asset import AssetIndex
from app.tasks.assets import *

class AssetProcess(Process):
    def run(self):
        Process.run_process(self,
                            resource_generator=AssetIndex.asset_generator(),
                            chain_signature=(
                                list_assets.s() | 
                                massage_assets.s() | 
                                save_assets.s()))

同样,默认队列被设置为“local_testing”,因此我不确定是如何通过管道将任何内容传输到“celery”队列的。我得到的回溯也没什么用。在

我将从app/上方的目录启动celery worker(使用“celery”队列,或使用本地测试队列(-Q local_testing)),如下所示:

celery -A app.celery_app worker -l info -n worker3.%h

非常感谢任何帮助。在

干杯!在


Tags: offromimport名称noneapptask队列
1条回答
网友
1楼 · 发布于 2024-04-19 20:51:22

我已经确定了问题所在,这是通过使用组。在

通过向链签名传递一个参数,它将自动异步应用。通过使用group,我将asyncResult对象分组,这没有任何意义。我彻底改变了行刑方式:

def run_process(self, resource_generator, chain_signature):
    for resources in resource_generator:
        chain_signature(resources)

不管怎样,这有效地实现了我想要的。在

干杯

相关问题 更多 >