如何处理faust应用程序崩溃后重复出现的未确认消息

2024-05-13 03:57:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有这样的效果。有时我的浮士德工人崩溃与python脚本终止。之后,脚本重新启动后,将再次接收最后处理的kafka消息。我可以终止脚本并重新运行它,但会再次收到相同的未确认的卡夫卡消息

我刚开始和浮士德合作,但我想:

  1. 似乎faust通常在代理函数中捕捉异常。据我所知,它捕获了一些标准异常,如ValueError、AttributeError等等。之后,应确认已处理的消息,并继续流处理。但是一些非标准的异常可以绕过except部分,导致所有脚本崩溃

  2. 崩溃后,处理过的消息仍然没有被确认,似乎卡夫卡等待已经崩溃的代理确认了20分钟

  3. 脚本重新启动后,卡夫卡认为它是一个新的订户。所以它向它发送未确认的消息,并等待这个新代理和崩溃的代理都确认消息。每次我重新启动脚本时都会这样。每次消息(因为崩溃的代理无法确认)都会被脚本接收,直到消息的ttl过期

如果消息的格式不好,那么情况会更糟,这就是脚本每次重新启动都会被错误终止的原因

如何处理这样的情况,在这样的崩溃之后有一个稳定的脚本行为


Tags: kafka函数脚本消息代理标准情况attributeerror