更新变更日志时,浮士德如何增加rocksdb中的偏移量?

2024-05-14 23:12:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我只是好奇,当我们使用RocksDB作为状态后端时,faust如何在内部更新changelog。

据我所知,在表更新期间,我们将有下一个行为:

使用_on_changelog_sent回调将新的变更日志发送到kafka变更日志主题(从有关此回调的文档中:这是将偏移量保留在RocksDB中的原因,以便在启动时我们知道数据库中已经有哪些偏移量的数据。

但最重要的问题是:在成功地将changelog消息存储在kafka日志中之后,是否会调用此回调?或者,当我们向kafka发送changelog消息时,通过回调更新rocksdb中的偏移量,但最终发送到kafka将失败?(因为一些卡夫卡集群问题)

在这种情况下,rocksdb中changelog主题的最后偏移量与kafka中的真实高水位偏移量不一致

如果浮士德现在尝试重新启动,那么我们将因为这种不一致性而失败

我这样问是因为我在生产中遇到了这样一个问题,rocksdb中的偏移量大于kafka changelog主题中的最后一个消息偏移量

我认为kafka制作人应该等待确认changelog事件已保存到kafka日志,然后才运行回调以更新rocksdb


Tags: kafka数据文档数据库消息主题on状态

热门问题