使用AIOKafka客户端连接到融合云

2024-05-29 01:37:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用AIOKafka{}repo athttps://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.pyAIOKafka{}示例的修改版本连接到我的Confluent CloudKafka集群。我已使用我认为正确的参数配置了我的AIOKafkaConsumerAIOKafkaProducer,但出现以下运行时错误:

/Users/galen/opt/anaconda3/envs/ds/bin/python /Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py
Traceback (most recent call last):
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 57, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 52, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 23, in produce_and_consume
    await producer.start()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start
    await self.client.bootstrap()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap
    version_hint=version_hint)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn
    await conn.connect()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect
    await self._do_sasl_handshake()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake
    payload, expect_response = res
RuntimeError: await wasn't used with future
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f9bc818d350>

我对代码的修改版本是:

import asyncio
from ssl import create_default_context, Purpose
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from kafka.common import TopicPartition

import ccloud_lib

ssl_context = create_default_context(Purpose.SERVER_AUTH, cafile='cacert.pem')
conf = ccloud_lib.read_ccloud_config('kafka_config.conf')

async def produce_and_consume(loop):
    # Produce
    producer = AIOKafkaProducer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop = loop,
        security_protocol='SASL_SSL',
        ssl_context=ssl_context,
        sasl_mechanism='PLAIN',
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )

    await producer.start()
    try:
        msg = await producer.send_and_wait(
            'my_topic', b"Super Message", partition=0)
    finally:
        await producer.stop()

    consumer = AIOKafkaConsumer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop=loop,
        ssl_context=ssl_context,
        security_protocol='SASL_SSL',
        sasl_mechanism='PLAIN',
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )
    await consumer.start()
    try:
        consumer.seek(TopicPartition('my_topic', 0), msg.offset)
        fetch_msg = await consumer.getone()
    finally:
        await consumer.stop()

    print("Success", msg, fetch_msg)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    task = loop.create_task(produce_and_consume(loop))
    try:
        loop.run_until_complete(task)
    finally:
        loop.run_until_complete(asyncio.sleep(0, loop=loop))
        task.cancel()
        try:
            loop.run_until_complete(task)
        except asyncio.CancelledError:
            pass

我的模糊配置conf如下所示:

bootstrap.servers=*****.us-central1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="********************" password\="****************************************";
sasl.username=********************
sasl.password=********************************************************************************
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=********************:********************
schema.registry.url=https://********************.us-central1.gcp.confluent.cloud

是否可以使用AIOKafka客户端连接到Confluent Cloud?我的配置是否有不正确的地方


Tags: producerinpyloopssllibconfline

热门问题