基于合流python库的自以为是的kafka python客户端

kafkian的Python项目详细描述


卡夫卡

Build StatuscodecovPyPI

kafkian是一个固执己见的高级消费者和生产者 confluent-kafka-python/librdkafka 部分灵感来自confluent_kafka_helpers。 主要用于cqrs/eventsourced系统中 仅限于生成和使用编码消息。

kafkian部分地模仿了kafka java api,部分地更像pythonic,部分地就像维护者喜欢它一样。

而不是通过属性配置所有的东西,大多数的东西 计划明确配置,并且可能通过依赖项 注射更容易测试。两个生产者的配置字典 消费者直接被传递到潜在的融合生产者和 消费者,隐藏在门面后面

库提供了一个基本序列化程序和反序列化程序类,以及 他们专门的avro子类,AvroSerializerAvroDeserializer。 这允许有一个简单的字符串密钥和avro编码的消息, 反之亦然。通常使用avro编码的字符串作为密钥,因为 为此,我们提供AvroStringKeySerializer

与合流库不同,我们支持提供特定的Avro模式 与消息一起,就像kafka java api一样。架构可以是 自动注册到模式注册表,我们还提供了三个 SubjectNameStrategy,再次与kafka java api兼容。

用法

生成消息

1.初始化生产者

fromkafkianimportProducerfromkafkian.serde.serializationimportAvroSerializer,AvroStringKeySerializer,SubjectNameStrategyproducer=Producer({'bootstrap.servers':config.KAFKA_BOOTSTRAP_SERVERS,},key_serializer=AvroStringKeySerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),value_serializer=AvroSerializer(schema_registry_url=config.SCHEMA_REGISTRY_URL,subject_name_strategy=SubjectNameStrategy.RecordNameStrategy))

2.定义消息架构

fromconfluent_kafkaimportavrofromkafkian.serde.avroserdebaseimportAvroRecordvalue_schema_str="""{   "namespace": "auth.users",   "name": "UserCreated",   "type": "record",   "fields" : [     {       "name" : "uuid",       "type" : "string"     },          {       "name" : "name",       "type" : "string"     },     {        "name": "timestamp",        "type": {            "type": "long",            "logicalType": "timestamp-millis"        }     }   ]}"""classUserCreated(AvroRecord):_schema=avro.loads(value_schema_str)

3生成消息

producer.produce("auth.users.events",user.uuid,UserCreated({"uuid":user.uuid,"name":user.name,"timestamp":int(user.timestamp.timestamp()*1000)}),sync=True)

消费信息

1.初始化耗电元件

CONSUMER_CONFIG={'bootstrap.servers':config.KAFKA_BOOTSTRAP_SERVERS,'default.topic.config':{'auto.offset.reset':'latest',},'group.id':'notifications'}consumer=Consumer(CONSUMER_CONFIG,topics=["auth.users.events"],key_deserializer=AvroDeserializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),value_deserializer=AvroDeserializer(schema_registry_url=config.SCHEMA_REGISTRY_URL),)

2.通过生成器使用消息

formessageinconsumer:handle_message(message)consumer.commit()

这里,message是由 汇合kafka python,通过.key()访问解码的密钥和值 分别是.value()

键和值都包装在动态生成的类中, 其全名与相应的avro架构全名相同。 在上面的示例中,该值将具有名为auth.users.UserCreated的类。

使用的消息键和值的avro模式可以通过.schema属性访问。

贡献

如前所述,这个图书馆很固执己见,不过,我愿意接受建议。 把你的问题和建议写在github上!

运行测试

提供了单元和系统测试。

要运行单元测试,请安装需求并仅运行

py.test tests/unit/

要运行系统测试,kafka集群和模式注册表是 必修的。提供了docker compose文件,只需运行

docker-compose up

一旦集群启动并运行,通过

py.test tests/system/

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java InputStream对象在声明后关闭   java未定义名为“transactionManager”的bean重命名transactionManager   java“jar”命令何时会拒绝将类添加到java中。jar文件?   java JPA标准依赖WHERE子句   安卓中从SD卡读取文本文件时出现java错误   java直接启用类似位置的权限   使用@WebMvcTest和Mockito-BDDMockito对SpringBoot-RestController进行java测试   java JSESSIONID存储在哪里?   java jtextarea鼠标事件覆盖容器鼠标事件   java DRL无法解析动态加载的类   java是从一个方法返回多个对象的最简单方法   java自定义按钮/编辑框是否不可见?   java GUI如何在保存用户输入的同时在面板或框架之间切换   swing Java自定义JSlider不会更新   GridBagLayout中的java超过1个JPanel   java从ProjectReactor中的flux中采样除第一个元素外的所有元素   Java泛型和泛型类型   Java代码生成宽指令的jvm