检测Celery是否可用/正在运行

71 投票
10 回答
57175 浏览
提问于 2025-04-17 08:22

我正在使用Celery来管理异步任务。不过,有时候Celery的进程会崩溃,这样就导致任务无法执行。我希望能检查一下Celery的状态,确保一切正常。如果发现问题,就给用户显示一个错误信息。从Celery Worker的文档来看,我可能可以使用ping或者inspect来实现这个功能,但使用ping感觉不太靠谱,而且inspect具体该怎么用也不太清楚(如果inspect().registered()是空的呢?)。

如果有人能给点建议就太好了。基本上我想要的是一个像这样的检查方法:

def celery_is_alive():
    from celery.task.control import inspect
    return bool(inspect().registered()) # is this right??

编辑:看起来在celery 2.3.3上,registered()根本不可用(尽管2.1的文档里有提到)。也许ping是个正确的选择。

编辑:而且ping似乎也没有达到我预期的效果,所以我还是不确定该怎么做。

10 个回答

13

如果你想通过命令行检查 celery 是否在后台运行,可以按照以下步骤操作:

  • 先激活你的虚拟环境,然后进入存放 'app' 的文件夹。
  • 接着输入命令:celery -A [app_name] status
  • 这个命令会告诉你 celery 是否在运行,以及有多少个节点在线。

来源: http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/

24

来自 Celery 4.2 的文档:

from your_celery_app import app


def get_celery_worker_status():
    i = app.control.inspect()
    availability = i.ping()
    stats = i.stats()
    registered_tasks = i.registered()
    active_tasks = i.active()
    scheduled_tasks = i.scheduled()
    result = {
        'availability': availability,
        'stats': stats,
        'registered_tasks': registered_tasks,
        'active_tasks': active_tasks,
        'scheduled_tasks': scheduled_tasks
    }
    return result

当然,你可以/应该在代码中加入错误处理来改进它……

65

这是我一直在用的代码。celery.task.control.Inspect.stats() 会返回一个字典,里面包含了关于当前可用工作者的很多细节。如果没有工作者在运行,它会返回 None;如果无法连接到消息中间件,它会抛出一个 IOError 错误。我使用的是 RabbitMQ,其他消息系统的表现可能会稍有不同。这在 Celery 2.3.x 和 2.4.x 版本中是有效的;我不确定这个功能能追溯到多早的版本。

def get_celery_worker_status():
    ERROR_KEY = "ERROR"
    try:
        from celery.task.control import inspect
        insp = inspect()
        d = insp.stats()
        if not d:
            d = { ERROR_KEY: 'No running Celery workers were found.' }
    except IOError as e:
        from errno import errorcode
        msg = "Error connecting to the backend: " + str(e)
        if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
            msg += ' Check that the RabbitMQ server is running.'
        d = { ERROR_KEY: msg }
    except ImportError as e:
        d = { ERROR_KEY: str(e)}
    return d

撰写回答