confluent_kafka的AvroConsumer提出“'dict'对象没有属性'get_by_id'”

2024-05-23 21:41:21 发布

您现在位置:Python中文网/ 问答频道 /正文

当轮询时,来自模块confluent_kafka.avro的我的AvroConsumer总是引发'dict' object has no attribute 'get_by_id'

尽管如此,当我从confluent_kafka用一个简单的Consumer进行轮询时,我会将二进制文件序列化。
ccloud CLI也可以很好地使用卡夫卡

知道confluent_kafka客户端为什么不工作吗?是因为我的配置吗?
我使用confluent-kafka==1.5.0

下面是我的python代码示例:

from confluent_kafka.avro import AvroConsumer

conf = {
    'bootstrap.servers': MY_BT_SERVERS,
    'sasl.mechanisms': "PLAIN",
    'security.protocol': "SASL_SSL",
    'sasl.username': API_KEY,
    'sasl.password': API_PASSWORD,
    'group.id': 'group_id',
    'auto.offset.reset': 'earliest'
}

schema_registry_conf = {
    'url': SR_ENDPOINT,
    'basic.auth.user.info': "USER_INFO",
    'schema.registry.basic.auth.user.info': f"{SR_API_KEY}:{SR_API_SECRET}"
}

consumer = AvroConsumer(config=conf, schema_registry=schema_registry_conf)

consumer.subscribe(["my-topic"])

message = consumer.poll(5)

这引起:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-22-95673a1ff746> in <module>
----> message = consumer.poll(5)


lib/python3.7/site-packages/confluent_kafka/avro/__init__.py in poll(self, timeout)
    164             try:
    165                 if message.value() is not None:
--> 166                     decoded_value = self._serializer.decode_message(message.value(), is_key=False)
    167                     message.set_value(decoded_value)
    168                 if message.key() is not None:

/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py in decode_message(self, message, is_key)
    229             if magic != MAGIC_BYTE:
    230                 raise SerializerError("message does not start with magic byte")
--> 231             decoder_func = self._get_decoder_func(schema_id, payload, is_key)
    232             return decoder_func(payload)

/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py in _get_decoder_func(self, schema_id, payload, is_key)
    161         # fetch writer schema from schema reg
    162         try:
--> 163             writer_schema_obj = self.registry_client.get_by_id(schema_id)
    164         except ClientError as e:
    165             raise SerializerError("unable to fetch schema with id %d: %s" % (schema_id, str(e)))

AttributeError: 'dict' object has no attribute 'get_by_id'

作为线索,我还想明确指出,我轮询的所有序列化消息都以奇怪的\x00\x00\x01\x86\xa1\字节开始,当我手动反序列化数据时,我必须清除这些字节

谢谢你的帮助


Tags: kafkakeyselfapiidmessagegetis
1条回答
网友
1楼 · 发布于 2024-05-23 21:41:21

您的错误在这里-schema_registry=schema_registry_conf

当您应该传递注册表客户端的实例时,您传递了一个字典

生产商示例

from confluent_kafka.avro import CachedSchemaRegistryClient

registry_conf = {
  'url' : os.environ.get('SCHEMA_REGISTRY', 'http://localhost:8081')
}
schema_registry = CachedSchemaRegistryClient(registry_conf)

avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema  # extract function definition 

p = Producer(...)
value = ...
value_payload = serialize_avro(topic, value_schema, value, is_key=False)
p.produce(topic, key=key, value=value_payload, callback=delivery_report)

相关问题 更多 >