Pyspark流式处理。RDD转DF并重复使用此DF。

2024-04-29 15:35:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经安装了Spark 2.1.0并将Kafka连接到Spark Streaming。我使用python3.5。 默认情况下,就我所理解的Spark Streaming,我使用rdd,但我想使用数据帧。所以我把RDD转换成DF,但是我不能使用它。显然,foreachRDD操作没有返回任何结果。在

我可以打印出数据帧,但不能用于进一步的计算。在

问题:

我错过了什么?在

kafkaStream = KafkaUtils.createStream(ssc, "10.0.26.44:2183",
                                      'spark-streaming', {'topic': 1})


pipelined_rdd = kafkaStream.filter(lambda v: is_valid_json(v))
pipelined_rdd = pipelined_rdd.map(lambda v: parse_events(v))

pipelined_rdd.foreachRDD(convert_to_df)

其中函数convert_to_df()

^{pr2}$

Tags: kafkato数据lambdaconvertdf情况spark