区分二进制编码的Avro和JSON消息

2024-04-25 21:20:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用python来读取来自不同主题的消息。有些主题的消息是用普通JSON编码的,而另一些主题则使用Avro二进制序列化,带有合流的schema registry。在

当我收到一条信息时,我需要知道它是否需要解码。目前,我只依赖于这样一个事实:二进制编码的消息以MAGIC_BYTE开头,该值为零:

from confluent_kafka.cimpl import Consumer

consumer = Consumer(config)
consumer.subsrcibe(...)
msg = consumer.poll()
# check the msg is not null or error etc
if msg.values()[0] == 0:
      # It is binary encoded
else:
      # It is json

我想知道有没有更好的方法?在


Tags: json消息编码主题序列化consumerisschema
2条回答

然后,您可以得到消息的0-5字节

magic_byte = message_bytes[0]
schema_id = message_bytes[1:5]

然后,对注册表执行GET /schemas/{schema_id}的查找,并在得到200响应代码时缓存ID+schema(如果需要)。在

否则,消息要么是JSON,要么是生产者将其数据发送到另一个注册中心(如果您的环境中有多个注册中心)。注意:这意味着数据可能仍然是Avro

您可以先通过REST查询schema注册表,然后构建在那里注册的主题的本地缓存。然后,当您试图解码来自某个特定主题的消息时,只需将该主题与该列表的内容进行比较。如果它在那里,你知道它已经被解码了。在

当然,只有当Avro编码的所有主题都使用schemaregistry时,这才有效。如果您曾经收到一个Avro编码的消息,它是在schemaregistry中注册的而不是,那么它将不起作用。在

相关问题 更多 >