Spark Python Avro Kafka反序列化

2024-05-15 14:37:23 发布

您现在位置:Python中文网/ 问答频道 /正文

我在pythonspark应用程序中创建了一个kafka流,可以解析任何通过它的文本。在

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})

我想改变它,以便能够解析来自kafka主题的avro消息。解析来自文件的avro消息时,我的操作方式如下:

^{pr2}$

我是python和spark的新手,如何更改流以解析avro消息?另外,我如何指定在从Kafka读取Avro消息时要使用的模式???我以前用java做过这些,但是python让我很困惑。在

编辑:

我试着换上avro解码器

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))

但是我得到了以下错误

            TypeError: 'DatumReader' object is not callable

Tags: kafka应用程序消息topicconsumeravrosparkstreaming
2条回答

正如@Zoltan Fedor在评论中提到的那样,所提供的答案现在有点过时了,因为它已经编写了2.5年了。confluent-kafka-python库已经发展为在本机支持相同的功能。给定代码中唯一需要的更改如下。在

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer

然后,你可以改变这条线-

^{pr2}$

我已经测试过了,效果很好。我可以在将来为任何需要的人提供答案。在

我也遇到了同样的挑战——在pyspark中反序列化来自Kafka的avro消息,并使用Confluent Schema Registry模块的Messageserializer方法解决了这个问题,因为在我们的例子中,模式存储在一个合流的模式注册表中。在

您可以在https://github.com/verisign/python-confluent-schemaregistry找到该模块

from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081')
serializer = MessageSerializer(schema_registry_client)


# simple decode to replace Kafka-streaming's built-in decode decoding UTF8 ()
def decoder(s):
    decoded_message = serializer.decode_message(s)
    return decoded_message

kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder)

lines = kvs.map(lambda x: x[1])
lines.pprint()

很明显,正如您所看到的,这段代码使用的是新的直接方法,没有接收者,因此产生了createdDirectStream(更多信息请参阅https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html

相关问题 更多 >