Celery任务状态总是待处理

22 投票
3 回答
23387 浏览
提问于 2025-04-18 09:20

我对celery和django还很陌生,所以请多多包涵。我正在尝试运行一个测试,进行一些计算,并等待测试完成,以确保结果是正确的。

这是我目前的代码:

在 app/tests.py 文件中

from tasks import *


c = calculate.apply_async(args=[1])

# wait until the task is done
while not calculate.AsyncResult(c.id).status == "SUCCESS":
    print c.state
    pass

在 app/tasks.py 文件中

from celery import shared_task

@shared_task
def calculate(proj_id):

    #some calculations followed by a save of the object

尽管在celery的日志中显示任务已经成功完成,但状态始终保持为“待处理”。

[2014-06-10 17:55:11,417: INFO/MainProcess] Received task: app.tasks.calculate[1f11e7ab-0add-42df-beac-3d94c6868aac]
[2014-06-10 17:55:11,505: INFO/MainProcess] Task app.tasks.calculate[1f11e7ab-0add-42df-beac-3d94c6868aac] succeeded in 0.0864518239978s: None

我还在 mainapp/settings.py 中设置了 CELERY_IGNORE_RESULT = False,但这似乎没有任何效果。

3 个回答

0

如果你在使用旧版的 django-celeryRabbitMQ 作为结果存储后端,那么这些设置可能会对你有帮助:

# Mostly, all settings are the same as in other answers

CELERY_RESULT_BACKEND = 'rpc://'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IGNORE_RESULT = False

# This line is what I needed
CELERY_TRACK_STARTED = True
15

直接来自文档:结果后端无法工作或任务始终处于PENDING状态。

所有任务默认都是PENDING状态,所以这个状态其实更应该叫“未知”。当你发送一个任务时,Celery并不会更新任何状态,任何没有历史记录的任务都会被认为是待处理的(毕竟你是知道任务的id的)。

  1. 确保任务没有启用ignore_result选项。

    如果启用这个选项,工作进程会跳过更新状态的步骤。

  2. 确保CELERY_IGNORE_RESULT设置没有启用。

  3. 确保没有旧的工作进程仍在运行。

    很容易不小心启动多个工作进程,所以在启动新的工作进程之前,确保之前的工作进程已经正确关闭。

    如果有一个旧的工作进程没有配置正确的结果后端,它可能会干扰任务的执行。

    可以将–pidfile参数设置为一个绝对路径,以确保不会发生这种情况。

  4. 确保客户端配置了正确的后端。

如果客户端配置使用的后端和工作进程不同,你将无法收到结果,所以请通过检查来确保后端是正确的:

>>> result = task.delay(…)
>>> print(result.backend)
8

所以,你的设置有问题。:) 你还需要为celery设置一个代理才能正常工作。

首先,djcelery已经不再使用了,所有功能都已经包含在celery里,可以和django一起使用。

其次,不要设置所有内容都可以接受,这可能会带来安全风险。只有在简单的json不够用的情况下,才使用pickle(比如你需要把函数或对象作为参数传递给任务,或者从任务中返回这些内容)。

我猜测,你只是想试试celery,这就是你尝试使用数据库作为后端的原因,这没问题,但如果是用于生产环境,我建议使用RabbitMQ。

无论如何,试试这些设置:

BROKER_URL = 'django://'
INSTALLED_APPS = (
    ...
    'kombu.transport.django', 
    ...
)
CELERY_RESULT_BACKEND = 'db+scheme://user:password@host:port/dbname' 
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IGNORE_RESULT = False # this is less important

然后运行python manage.py syncdb

顺便告诉你,我没有使用数据库作为代理或结果后端,所以设置可能不完整,甚至可能不正确,但还是可以试试。

更多关于CELERY_RESULT_BACKEND的数据库设置示例

如果你想设置RabbitMQ作为代理后端,我推荐这样做,我也很确定它能正常工作:

如果你在ubuntu上,运行:

sudo apt-get install rabbitmq-server
sudo rabbitmqctl add_user <username> <password>
sudo rabbitmqctl add_vhost <vhost, use project name for example>
sudo rabbitmqctl set_permissions -p <vhost> <username"> ".*" ".*" ".*"

然后在settings.py中配置celery:

BROKER_URL = 'amqp://<user>:<password>@localhost:5672/<vhost>'
CELERY_TIMEZONE = TIME_ZONE
CELERY_RESULT_BACKEND = 'amqp'
# thats where celery will store scheduled tasks in case you restart the broker:
CELERYD_STATE_DB = "/full/path/data/celery_worker_state" 
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

告诉我结果如何。

撰写回答