如何在Celery-Django应用中监控工作事件?
根据关于 Celery工作进程实时监控 的教程,我们可以通过编程的方式捕捉工作进程产生的事件,并根据这些事件采取相应的行动。
我想知道如何在一个Celery-Django应用中,像这个例子那样集成一个监控工具?
编辑: 教程中的代码示例看起来是这样的:
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
task_id = event['uuid']
print('TASK FAILED: %s[%s] %s' % (
event['name'], task_id, state[task_id].info(), ))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'worker-heartbeat': announce_dead_workers,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
celery = Celery(broker='amqp://guest@localhost//')
my_monitor(celery)
我想捕捉工作进程发送的task_failed事件,并像教程中展示的那样获取它的task_id,以便从为我的应用配置的结果后端获取这个任务的结果,并进一步处理。我的问题是,在django-celery项目中,Celery库的实例化对我来说并不明显,我不知道该如何获取应用程序。
我也欢迎任何其他想法,关于如何在工作进程完成任务后处理结果。
2 个回答
8
注意几个需要留意的地方
- 你需要在你的celery配置中把
CELERY_SEND_EVENTS
这个选项设置为真。 - 你还可以在你的工作线程中开启一个新的线程来设置事件监控。
下面是我的实现:
class MonitorThread(object):
def __init__(self, celery_app, interval=1):
self.celery_app = celery_app
self.interval = interval
self.state = self.celery_app.events.State()
self.thread = threading.Thread(target=self.run, args=())
self.thread.daemon = True
self.thread.start()
def catchall(self, event):
if event['type'] != 'worker-heartbeat':
self.state.event(event)
# logic here
def run(self):
while True:
try:
with self.celery_app.connection() as connection:
recv = self.celery_app.events.Receiver(connection, handlers={
'*': self.catchall
})
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
# unable to capture
pass
time.sleep(self.interval)
if __name__ == '__main__':
app = get_celery_app() # returns app
MonitorThread(app)
app.start()
22
好的,我找到了一种方法来实现这个,虽然我不确定这是不是最好的解决方案,但对我来说有效。这个监控功能基本上是直接连接到消息代理,并监听不同类型的事件。我的代码是这样的:
from celery.events import EventReceiver
from kombu import Connection as BrokerConnection
def my_monitor:
connection = BrokerConnection('amqp://guest:guest@localhost:5672//')
def on_event(event):
print "EVENT HAPPENED: ", event
def on_task_failed(event):
exception = event['exception']
print "TASK FAILED!", event, " EXCEPTION: ", exception
while True:
try:
with connection as conn:
recv = EventReceiver(conn,
handlers={'task-failed' : on_task_failed,
'task-succeeded' : on_event,
'task-sent' : on_event,
'task-received' : on_event,
'task-revoked' : on_event,
'task-started' : on_event,
# OR: '*' : on_event
})
recv.capture(limit=None, timeout=None)
except (KeyboardInterrupt, SystemExit):
print "EXCEPTION KEYBOARD INTERRUPT"
sys.exit()
就这些。我是在一个和正常应用程序不同的进程中运行这个,也就是说,我创建了一个我的celery应用程序的子进程,专门运行这个功能。
希望这对你有帮助。