基于合流python库的自以为是的kafka python客户端
kafkian的Python项目详细描述
卡夫卡
kafkian是一个固执己见的高级消费者和生产者 confluent-kafka-python/librdkafka 部分灵感来自confluent_kafka_helpers。 主要用于cqrs/eventsourced系统中 仅限于生成和使用编码消息。
kafkian部分地模仿了kafka java api,部分地更像pythonic,部分地就像维护者喜欢它一样。
而不是通过属性配置所有的东西,大多数的东西 计划明确配置,并且可能通过依赖项 注射更容易测试。两个生产者的配置字典 消费者直接被传递到潜在的融合生产者和 消费者,隐藏在门面后面
库提供了一个基本序列化程序和反序列化程序类,以及
他们专门的avro子类,AvroSerializer
和AvroDeserializer
。
这允许有一个简单的字符串密钥和avro编码的消息,
反之亦然。通常使用avro编码的字符串作为密钥,因为
为此,我们提供AvroStringKeySerializer
与合流库不同,我们支持提供特定的Avro模式
与消息一起,就像kafka java api一样。架构可以是
自动注册到模式注册表,我们还提供了三个
SubjectNameStrategy
,再次与kafka java api兼容。
用法
生成消息
1.初始化生产者
fromkafkianimportProducerfromkafkian.serde.serializationimportAvroSerializer,AvroStringKeySerializer,SubjectNameStrategyproducer=Producer({'bootstrap.servers':config.KAFKA_BOOTSTRAP_SERVERS,},key_serializer=AvroStringKeySerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),value_serializer=AvroSerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL,subject_name_strategy=SubjectNameStrategy.RecordNameStrategy))
2.定义消息架构
fromconfluent_kafkaimportavrofromkafkian.serde.avroserdebaseimportAvroRecordvalue_schema_str="""{ "namespace": "auth.users", "name": "UserCreated", "type": "record", "fields" : [ { "name" : "uuid", "type" : "string" }, { "name" : "name", "type" : "string" }, { "name": "timestamp", "type": { "type": "long", "logicalType": "timestamp-millis" } } ]}"""classUserCreated(AvroRecord):_schema=avro.loads(value_schema_str)
3生成消息
producer.produce("auth.users.events",user.uuid,UserCreated({"uuid":user.uuid,"name":user.name,"timestamp":int(user.timestamp.timestamp()*1000)}),sync=True)
消费信息
1.初始化耗电元件
CONSUMER_CONFIG={'bootstrap.servers':config.KAFKA_BOOTSTRAP_SERVERS,'default.topic.config':{'auto.offset.reset':'latest',},'group.id':'notifications'}consumer=Consumer(CONSUMER_CONFIG,topics=["auth.users.events"],key_deserializer=AvroDeserializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),value_deserializer=AvroDeserializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),)
2.通过生成器使用消息
formessageinconsumer:handle_message(message)consumer.commit()
这里,message
是由
汇合kafka python,通过.key()
访问解码的密钥和值
分别是.value()
。
键和值都包装在动态生成的类中,
其全名与相应的avro架构全名相同。
在上面的示例中,该值将具有名为auth.users.UserCreated
的类。
使用的消息键和值的avro模式可以通过.schema
属性访问。
贡献
如前所述,这个图书馆很固执己见,不过,我愿意接受建议。 把你的问题和建议写在github上!
运行测试
提供了单元和系统测试。
要运行单元测试,请安装需求并仅运行
py.test tests/unit/
要运行系统测试,kafka集群和模式注册表是 必修的。提供了docker compose文件,只需运行
docker-compose up
一旦集群启动并运行,通过
py.test tests/system/