在SchemaRegistrCyclient汇合Kafka中禁用证书验证

2024-06-08 15:52:03 发布

您现在位置:Python中文网/ 问答频道 /正文

所以,我想读一个来自kafka(Confluent)的主题,其中数据是Avro格式的

由于某些不可避免的原因,我想禁用证书验证

我使用的是security.protocol=SASL_SSL和SASL.mechanism=OAuthBear

我可以通过禁用ssl证书验证连接到Kafka

'enable.ssl.certificate.verification': 'false'

现在,我在尝试使用Schema registry反序列化值时遇到了一个问题。 avro反序列化程序需要模式注册表客户端和模式(可选)。我两个都经过。我通过使用verify=False发出单独的请求来获得第二个参数的值,以获得模式,这很好。但当我试图用它创建一个反序列化的消费者时,问题就出现了

基本上是代码的框架(以及问题所在的注释)

topic="mytopic"
registry_configuration="schema registry url"
url = urljoin(registry_configuration, f'/subjects/{topic}-value/versions/latest')


schema_registry_response = requests.get(url, verify=False) #here I intentionally used verify=False to get the schema
schema_registry_response.raise_for_status()
        
consumption_schema=schema_registry_response.json()['schema']
print(consumption_schema) # This works fine

schema_registry_client = SchemaRegistryClient({'url': registry_configuration})
avro_deserializer = AvroDeserializer(schema_registry_client,consumption_schema
                                             ) # This is the problem area which is called when the poll method is called


string_deserializer = StringDeserializer('utf_8')

basic_conf=_get_basic_configuration()
consumer_conf = {'key.deserializer': string_deserializer,
                 'value.deserializer': avro_deserializer,
                 'group.id': 'myconsumergroupid',
                 'auto.offset.reset': 'earliest'}
consumer_conf.update(basic_conf)

        # update default config with parameter
additional_consumer_conf={}
consumer_conf.update(additional_consumer_conf)
cn=DeserializingConsumer(consumer_conf)    
cn.subscribe(['topicname'])

while True:
  msg=cn.poll(10) # This fails because the deserializingconsumer calls the schema registry and the certificate validation fails

上面看到的get_basic_配置方法

'enable.ssl.certificate.verification': 'false'

错误是

Max retries exceeded with url: /schemas/ids/140 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain

我已经阅读了SchemaRegistryClienthere的代码,但是我没有看到任何将证书验证设置为false的选项

我还搜索了S.O的帖子和其他合流的文档,看看是否能找到什么,但没有任何帮助

希望这里有人知道更多?我愿意详细说明或澄清任何问题? 如果可能的话,我尽量不使用很多自定义逻辑来反序列化


Tags: theurlsslget序列化basicconsumerschema
1条回答
网友
1楼 · 发布于 2024-06-08 15:52:03

我已经找到了答案

基本上这是S.O post here。尤其是在接受答案之后的答案,如果你使用的是合流卡夫卡

和avro文档here,因为我的模式不是来自文件,而是作为http响应,所以我必须使用avro.schema.parse解析它

最终骨架代码

topic="mytopic"
registry_configuration="schema registry url"
url = urljoin(registry_configuration, f'/subjects/{topic}-value/versions/latest')


schema_registry_response = requests.get(url, verify=False)
schema_registry_response.raise_for_status()
        
consumption_schema=schema_registry_response.json()['schema']
consumption_schema = avro.schema.parse(consumption_schema)

schema_registry_client = SchemaRegistryClient({'url': registry_configuration})

basic_conf=_get_basic_configuration()
consumer_conf = {
                 'group.id': 'myconsumergroupid',
                 'auto.offset.reset': 'earliest'}
consumer_conf.update(basic_conf)


cn=Consumer(consumer_conf)    
cn.subscribe(['mytopic'])
reader = DatumReader(consumption_schema)

while True:
  msg=cn.poll(10)
  if msg is None:
    break
  m=msg.value()
  message_bytes = io.BytesIO(m)
  
  message_bytes.seek(5)
  decoder = BinaryDecoder(message_bytes)
  event_dict = reader.read(decoder) 
  print(event_dict)
  

相关问题 更多 >