<p>当心几个问题</p>
<ol>
<li>您需要在芹菜配置中将<code>CELERY_SEND_EVENTS</code>标志设置为true。</li>
<li>您还可以在工作线程的新线程中设置事件监视器。</li>
</ol>
<p>以下是我的实现:</p>
<pre><code>class MonitorThread(object):
def __init__(self, celery_app, interval=1):
self.celery_app = celery_app
self.interval = interval
self.state = self.celery_app.events.State()
self.thread = threading.Thread(target=self.run, args=())
self.thread.daemon = True
self.thread.start()
def catchall(self, event):
if event['type'] != 'worker-heartbeat':
self.state.event(event)
# logic here
def run(self):
while True:
try:
with self.celery_app.connection() as connection:
recv = self.celery_app.events.Receiver(connection, handlers={
'*': self.catchall
})
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
# unable to capture
pass
time.sleep(self.interval)
if __name__ == '__main__':
app = get_celery_app() # returns app
MonitorThread(app)
app.start()
</code></pre>