检测Celery是否可用/正在运行
我正在使用Celery来管理异步任务。不过,有时候Celery的进程会崩溃,这样就导致任务无法执行。我希望能检查一下Celery的状态,确保一切正常。如果发现问题,就给用户显示一个错误信息。从Celery Worker的文档来看,我可能可以使用ping或者inspect来实现这个功能,但使用ping感觉不太靠谱,而且inspect具体该怎么用也不太清楚(如果inspect().registered()是空的呢?)。
如果有人能给点建议就太好了。基本上我想要的是一个像这样的检查方法:
def celery_is_alive():
from celery.task.control import inspect
return bool(inspect().registered()) # is this right??
编辑:看起来在celery 2.3.3上,registered()根本不可用(尽管2.1的文档里有提到)。也许ping是个正确的选择。
编辑:而且ping似乎也没有达到我预期的效果,所以我还是不确定该怎么做。
10 个回答
13
如果你想通过命令行检查 celery 是否在后台运行,可以按照以下步骤操作:
- 先激活你的虚拟环境,然后进入存放 'app' 的文件夹。
- 接着输入命令:
celery -A [app_name] status
- 这个命令会告诉你 celery 是否在运行,以及有多少个节点在线。
24
来自 Celery 4.2 的文档:
from your_celery_app import app
def get_celery_worker_status():
i = app.control.inspect()
availability = i.ping()
stats = i.stats()
registered_tasks = i.registered()
active_tasks = i.active()
scheduled_tasks = i.scheduled()
result = {
'availability': availability,
'stats': stats,
'registered_tasks': registered_tasks,
'active_tasks': active_tasks,
'scheduled_tasks': scheduled_tasks
}
return result
当然,你可以/应该在代码中加入错误处理来改进它……
65
这是我一直在用的代码。celery.task.control.Inspect.stats()
会返回一个字典,里面包含了关于当前可用工作者的很多细节。如果没有工作者在运行,它会返回 None;如果无法连接到消息中间件,它会抛出一个 IOError
错误。我使用的是 RabbitMQ,其他消息系统的表现可能会稍有不同。这在 Celery 2.3.x 和 2.4.x 版本中是有效的;我不确定这个功能能追溯到多早的版本。
def get_celery_worker_status():
ERROR_KEY = "ERROR"
try:
from celery.task.control import inspect
insp = inspect()
d = insp.stats()
if not d:
d = { ERROR_KEY: 'No running Celery workers were found.' }
except IOError as e:
from errno import errorcode
msg = "Error connecting to the backend: " + str(e)
if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
msg += ' Check that the RabbitMQ server is running.'
d = { ERROR_KEY: msg }
except ImportError as e:
d = { ERROR_KEY: str(e)}
return d