我有一个for循环,通过遍历主题名列表来创建kafka数据流。然后我想对每个RDD应用一个函数,但是我需要将主题名传递到函数中。我看到的是,在函数中只有最后一个topic值可用(topic3)。我明白那是因为foreachRDD被懒散地处决了。有没有办法把主题名传进来?我必须做到这一点,而不使用结构化流
topics = ["topic1", "topic2", "topic3"]
ssc = StreamingContext(spark_context, 5)
def process_topic(rdd, topic_value):
if not rdd.isEmpty():
print "topic : "+topic_value
df = rdd.toDF().show()
for topic in topics:
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],{'metadata.broker.list': 'broker_url', 'auto.offset.reset': 'smallest'})
lines = directKafkaStream.map(lambda v: json.loads(v[1]))
lines.foreachRDD(lambda x: process_topic(x, topic))
ssc.start()
time.sleep(10)
ssc.stop()
输出如下所示:
“主题3”
(主题1的df)
“主题3”
(主题2的df)
“主题3”
(主题3的df)
希望看到:
“主题1”
(主题1的df)
“主题2”
(主题2的df)
“主题3”
(主题3的df)
目前没有回答
相关问题 更多 >
编程相关推荐