卡夫卡消费者不使用Spark消费重新处理的数据

2024-05-14 10:23:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我们使用pyspark应用程序来处理Kafka中源主题中的一些数据,并将处理后的数据写入单独的主题中。我们还有另一个python应用程序,它使用Kafka python来使用处理过的主题。第一次运行时一切正常

稍后,我们决定向已处理的主题添加另一列。源主题已经包含了这些信息,所以我们停止旧的Kafka/Spark流,开始一个新的流,除了包含新列之外,它的功能完全相同。新的流以startingOffsets设置为earliest开始。新列被打印到控制台,因此我假设该列现在包含在已处理的主题中

使用使用者的python应用程序会停止旧使用者并启动新使用者。这一个从startingOffsets 设置为latest开始。问题是消费者似乎并不使用新处理的数据。不知何故,重新处理的数据不会触发python应用程序的消费。我是不是遗漏了什么

顺便说一句:在使用consumer的python应用程序上使用startingOffsets设置为earliest时,它开始使用所有旧数据,但不使用新列的新处理数据

例如:

首轮

源主题包含以下内容:

|column1|column2|column3|column4|column5|column6|

这由pyspark应用程序处理为已处理的主题:

|column1|column2|column4|column6|

python和消费者应用程序使用它。即使一个小时后新数据进来,它也会被处理

更新运行后

源主题包含以下内容:

|column1|column2|column3|column4|column5|column6|

这由pyspark应用程序处理为已处理的主题:

|column1|column2|column4|column5|column6|

python应用程序不会使用它

更新

pyspark应用程序的代码:

self.data_frame = self.spark_session.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_HOST) \
            .option("subscribe", self.compute_config.topic_sources[0]) \
            .option("startingOffsets", "earliest" if reset_offset is True else "latest") \
            .option("failOnDataLoss", False) \
            .load()

...

self.ds = self.data_frame.select("key", from_json(col("value").cast("string"), self.schema).alias("value")) \
            .withColumn("value", col("value").cast(self.rename_schema)) \
            .withColumn("value", to_json("value")) \
            .writeStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_HOST) \
            .option("topic", self.compute_config.topic_target) \
            .option("checkpointLocation", f"/app/checkpoints/{self.compute_config.topic_target+str(self.compute_config.id)}") \
            .start()

来自python应用程序和使用者的代码:

 self.consumer = KafkaConsumer(self.sink_config.topic_target,
                                      bootstrap_servers=[KAFKA_HOST],
                                      auto_offset_reset="latest",
                                      enable_auto_commit=False)

for message in self.consumer:
    ...

Tags: 数据selfconfig应用程序主题topicvalue使用者

热门问题