如何在for循环中使用参数运行foreachRDD

2024-04-26 14:38:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个for循环,通过遍历主题名列表来创建kafka数据流。然后我想对每个RDD应用一个函数,但是我需要将主题名传递到函数中。我看到的是,在函数中只有最后一个topic值可用(topic3)。我明白那是因为foreachRDD被懒散地处决了。有没有办法把主题名传进来?我必须做到这一点,而不使用结构化流

topics = ["topic1", "topic2", "topic3"]
ssc = StreamingContext(spark_context, 5)

def process_topic(rdd, topic_value):
    if not rdd.isEmpty():
        print "topic : "+topic_value
        df = rdd.toDF().show()

for topic in topics:
    directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],{'metadata.broker.list': 'broker_url', 'auto.offset.reset': 'smallest'})
    lines = directKafkaStream.map(lambda v: json.loads(v[1]))
    lines.foreachRDD(lambda x: process_topic(x, topic))

ssc.start()
time.sleep(10)
ssc.stop()

输出如下所示:

“主题3”

(主题1的df)

“主题3”

(主题2的df)

“主题3”

(主题3的df)

希望看到:

“主题1”

(主题1的df)

“主题2”

(主题2的df)

“主题3”

(主题3的df)


Tags: 函数df主题fortopicvaluebrokerprocess