2024-05-16 16:06:43 发布
网友
根据文档,从(scala)spark流应用程序中commit offset into kafka是可能的。 我想从pyspark实现相同的功能。 或者至少将kafka分区、偏移量存储到外部数据存储(RDBMS等)。在
然而,用于kafka集成的pysparkaapi只提供RDD(offset, value)],而不是{}(如scala中所示)。 有没有办法从pythonrdd获得(topic, partition, offset)?再坚持到哪里去?在
RDD(offset, value)]
(topic, partition, offset)
我们可以用多种方式处理偏移量。其中一种方法是在每次成功处理数据时,在Zookeeper路径中存储偏移值,并在再次创建流时读取该值。代码段如下。在
from kazoo.client import KazooClient zk = KazooClient(hosts='127.0.0.1:2181') zk.start() ZOOKEEPER_SERVERS = "127.0.0.1:2181" def get_zookeeper_instance(): from kazoo.client import KazooClient if 'KazooSingletonInstance' not in globals(): globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS) globals()['KazooSingletonInstance'].start() return globals()['KazooSingletonInstance'] def save_offsets(rdd): zk = get_zookeeper_instance() for offset in rdd.offsetRanges(): path = f"/consumers/{var_topic_src_name}" print(path) zk.ensure_path(path) zk.set(path, str(offset.untilOffset).encode()) var_offset_path = f'/consumers/{var_topic_src_name}' try: var_offset = int(zk.get(var_offset_path)[0]) except: print("The spark streaming started First Time and Offset value should be Zero") var_offset = 0 var_partition = 0 enter code here topicpartion = TopicAndPartition(var_topic_src_name, var_partition) fromoffset = {topicpartion: var_offset} print(fromoffset) kvs = KafkaUtils.createDirectStream(ssc,\ [var_topic_src_name],\ var_kafka_parms_src,\ valueDecoder=serializer.decode_message,\ fromOffsets = fromoffset) kvs.foreachRDD(handler) kvs.foreachRDD(save_offsets)
问候
Karthikeyan Rasipalayam Durairaj公司
我们可以用多种方式处理偏移量。其中一种方法是在每次成功处理数据时,在Zookeeper路径中存储偏移值,并在再次创建流时读取该值。代码段如下。在
问候
Karthikeyan Rasipalayam Durairaj公司
相关问题 更多 >
编程相关推荐