我在pythonspark应用程序中创建了一个kafka流,可以解析任何通过它的文本。在
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
我想改变它,以便能够解析来自kafka主题的avro消息。解析来自文件的avro消息时,我的操作方式如下:
^{pr2}$我是python和spark的新手,如何更改流以解析avro消息?另外,我如何指定在从Kafka读取Avro消息时要使用的模式???我以前用java做过这些,但是python让我很困惑。在
编辑:
我试着换上avro解码器
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))
但是我得到了以下错误
TypeError: 'DatumReader' object is not callable
正如@Zoltan Fedor在评论中提到的那样,所提供的答案现在有点过时了,因为它已经编写了2.5年了。confluent-kafka-python库已经发展为在本机支持相同的功能。给定代码中唯一需要的更改如下。在
然后,你可以改变这条线-
^{pr2}$我已经测试过了,效果很好。我可以在将来为任何需要的人提供答案。在
我也遇到了同样的挑战——在pyspark中反序列化来自Kafka的avro消息,并使用Confluent Schema Registry模块的Messageserializer方法解决了这个问题,因为在我们的例子中,模式存储在一个合流的模式注册表中。在
您可以在https://github.com/verisign/python-confluent-schemaregistry找到该模块
很明显,正如您所看到的,这段代码使用的是新的直接方法,没有接收者,因此产生了createdDirectStream(更多信息请参阅https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)
相关问题 更多 >
编程相关推荐