<h3>实时处理</h3>
<p>要实时处理事件,您需要以下几点</p>
<ul>
<li><p>事件使用者(这是<code>Receiver</code>)</p></li>
<li><p>事件进入时调用的一组处理程序</p>
<blockquote>
<p>You can have different handlers for each event type, or a catch-all handler can be used ('*')</p>
</blockquote></li>
<li><p>州(可选)</p>
<p><code>app.events.State</code>是集群中任务和工作者的一种方便的内存表示形式,随着事件的到来而更新</p>
<p>它封装了许多常见问题的解决方案,比如检查工作人员是否仍然活着(通过验证心跳),在事件发生时将事件字段合并在一起,确保时间戳同步,等等</p></li>
</ul>
<p>结合这些功能,您可以轻松地实时处理事件:</p>
<pre class="lang-py prettyprint-override"><code>from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
</code></pre>
<p><strong>注意</strong>:<code>capture</code>的<code>wakeup</code>参数向所有工作人员发送信号,强制他们发送心跳。这样,当监视器启动时,您可以立即看到工人</p>
<p>您可以通过指定处理程序来侦听特定事件:</p>
<pre class="lang-py prettyprint-override"><code>from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
</code></pre>
<p><sub><b/><a href="https://docs.celeryproject.org/en/stable/userguide/monitoring.html#real-time-processing" rel="nofollow noreferrer">Monitoring and Management Guide — Celery 4.4.2 documention</a></sub></p>