pyspark流式提交

2024-05-16 16:06:43 发布

您现在位置:Python中文网/ 问答频道 /正文

根据文档,从(scala)spark流应用程序中commit offset into kafka是可能的。 我想从pyspark实现相同的功能。
或者至少将kafka分区、偏移量存储到外部数据存储(RDBMS等)。在

然而,用于kafka集成的pysparkaapi只提供RDD(offset, value)],而不是{}(如scala中所示)。 有没有办法从pythonrdd获得(topic, partition, offset)?再坚持到哪里去?在


Tags: kafka数据文档功能应用程序sparkpysparkoffset
1条回答
网友
1楼 · 发布于 2024-05-16 16:06:43

我们可以用多种方式处理偏移量。其中一种方法是在每次成功处理数据时,在Zookeeper路径中存储偏移值,并在再次创建流时读取该值。代码段如下。在

from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
ZOOKEEPER_SERVERS = "127.0.0.1:2181"

def get_zookeeper_instance():
    from kazoo.client import KazooClient
    if 'KazooSingletonInstance' not in globals():
        globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
        globals()['KazooSingletonInstance'].start()
    return globals()['KazooSingletonInstance']

def save_offsets(rdd):
    zk = get_zookeeper_instance()
    for offset in rdd.offsetRanges():
        path = f"/consumers/{var_topic_src_name}"
        print(path)
        zk.ensure_path(path)
        zk.set(path, str(offset.untilOffset).encode())

    var_offset_path = f'/consumers/{var_topic_src_name}'

    try:
        var_offset = int(zk.get(var_offset_path)[0])
    except:
        print("The spark streaming started First Time and Offset value should be Zero")
        var_offset  = 0
    var_partition = 0
    enter code here
    topicpartion = TopicAndPartition(var_topic_src_name, var_partition)
    fromoffset = {topicpartion: var_offset}
    print(fromoffset)
    kvs = KafkaUtils.createDirectStream(ssc,\
                                        [var_topic_src_name],\
                                        var_kafka_parms_src,\
                                        valueDecoder=serializer.decode_message,\
                                        fromOffsets = fromoffset)
    kvs.foreachRDD(handler)
    kvs.foreachRDD(save_offsets)

问候

Karthikeyan Rasipalayam Durairaj公司

相关问题 更多 >