我们使用pyspark应用程序来处理Kafka中源主题中的一些数据,并将处理后的数据写入单独的主题中。我们还有另一个python应用程序,它使用Kafka python来使用处理过的主题。第一次运行时一切正常
稍后,我们决定向已处理的主题添加另一列。源主题已经包含了这些信息,所以我们停止旧的Kafka/Spark流,开始一个新的流,除了包含新列之外,它的功能完全相同。新的流以startingOffsets
设置为earliest
开始。新列被打印到控制台,因此我假设该列现在包含在已处理的主题中
使用使用者的python应用程序会停止旧使用者并启动新使用者。这一个从startingOffsets
设置为latest
开始。问题是消费者似乎并不使用新处理的数据。不知何故,重新处理的数据不会触发python应用程序的消费。我是不是遗漏了什么
顺便说一句:在使用consumer的python应用程序上使用startingOffsets
设置为earliest
时,它开始使用所有旧数据,但不使用新列的新处理数据
例如:
首轮
源主题包含以下内容:
|column1|column2|column3|column4|column5|column6|
这由pyspark应用程序处理为已处理的主题:
|column1|column2|column4|column6|
python和消费者应用程序使用它。即使一个小时后新数据进来,它也会被处理
更新运行后
源主题包含以下内容:
|column1|column2|column3|column4|column5|column6|
这由pyspark应用程序处理为已处理的主题:
|column1|column2|column4|column5|column6|
python应用程序不会使用它
pyspark应用程序的代码:
self.data_frame = self.spark_session.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_HOST) \
.option("subscribe", self.compute_config.topic_sources[0]) \
.option("startingOffsets", "earliest" if reset_offset is True else "latest") \
.option("failOnDataLoss", False) \
.load()
...
self.ds = self.data_frame.select("key", from_json(col("value").cast("string"), self.schema).alias("value")) \
.withColumn("value", col("value").cast(self.rename_schema)) \
.withColumn("value", to_json("value")) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_HOST) \
.option("topic", self.compute_config.topic_target) \
.option("checkpointLocation", f"/app/checkpoints/{self.compute_config.topic_target+str(self.compute_config.id)}") \
.start()
来自python应用程序和使用者的代码:
self.consumer = KafkaConsumer(self.sink_config.topic_target,
bootstrap_servers=[KAFKA_HOST],
auto_offset_reset="latest",
enable_auto_commit=False)
for message in self.consumer:
...
目前没有回答
相关问题 更多 >
编程相关推荐