根据芹菜教程中有关real-time monitoring of celery workers的内容,还可以通过编程捕获工人生成的事件并相应地采取行动。
我的问题是,如何在芹菜Django应用程序中将监视器集成为this示例中的监视器?
编辑: 教程中的代码示例如下:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
task_id = event['uuid']
print('TASK FAILED: %s[%s] %s' % (
event['name'], task_id, state[task_id].info(), ))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'worker-heartbeat': announce_dead_workers,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
celery = Celery(broker='amqp://guest@localhost//')
my_monitor(celery)
因此,我想捕获工作进程发送的task_failed事件,并像教程中显示的那样获取其task_id,从为我的应用程序配置的结果后端获取此任务的结果并进一步处理它。我的问题是,如何获取应用程序对我来说并不明显,因为在django芹菜项目中,芹菜库的实例化对我来说并不透明。
对于如何在工作人员完成执行任务后处理结果,我也持开放态度。
好吧,我找到了一种方法,虽然我不确定这是否是解决方案,但它对我有效。monitor函数基本上直接连接到代理并监听不同类型的事件。我的代码如下:
就这些。我在一个不同于普通应用程序的进程中运行它,这意味着我创建了一个芹菜应用程序的子进程,它只运行这个函数。 高温高压
当心几个问题
CELERY_SEND_EVENTS
标志设置为true。以下是我的实现:
相关问题 更多 >
编程相关推荐