结构Python消息总线库
fabric-message-bus的Python项目详细描述
消息总线模式和消息定义
结构消息总线模式的基本框架和参与者间通信的消息
概述
使用apachekafka实现了控制和测量框架中不同参与者之间的结构通信。在
apachekafka是一个为流而设计的分布式系统。它被构建为容错、高吞吐量、水平扩展,并允许地理分布的数据流和流处理应用程序。在
Kafka支持各种参与者/服务的事件驱动实现。事件既是事实又是触发因素。每个fabric参与者都将是一个主题的制作人,遵循单作者原则,并订阅其他参与者的主题进行交流。消息通过Kafka使用Apache Avro数据序列化系统进行交换。在
要求
- Python 3.7+
- 融合的卡夫卡
- 融合的卡夫卡
安装
$ pip3 install .
使用
这个包实现了生产者/消费者api的接口,以通过Avro序列化向Kafka推送/读取消息。在
消息和模式
用户应继承IMessage类(消息.py)定义它自己的成员并覆盖到_dict()函数。它还需要定义与派生类相关的相应AVRO模式。这种新的模式应该在生产者和消费者中使用。在
基本IMessage类的示例模式在(schema)中可用/消息.avsc)在
生产者
AvroProducerApi类实现avrokafka生产者的基本功能。用户应继承此类并重写delivery_report方法以处理异步生成的消息传递。在
在结束时可用的用法示例producer.py在
消费者
AvroConsumerApi类实现avrokafka消费者的基本功能。用户应继承此类并重写process\u message方法来处理传入消息的消息处理。在
在结束时可用的用法示例消费者.py在
管理API
AdminApi类提供对执行基本管理功能(如创建/删除主题/分区等)的支持
如何启动一个测试Kafka集群进行测试
生成凭证
您必须生成CA证书(如果已经有CA证书,则使用您的证书),然后为代理和客户机生成密钥库和信任库。在
^{pr2}$设置secrets目录的环境变量。这在以后的命令中使用。确保您在MessageBus目录中。在
export KAFKA_SSL_SECRETS_DIR=$(pwd)/secrets
打开容器
你可以使用docker-合成.yaml文件来启动一个包含
- 经纪人
- 动物园管理员
- 架构注册表
使用下面的命令启动集群
docker-compose up -d
这将产生以下容器:
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
189ba0e70b97 confluentinc/cp-schema-registry:latest "/etc/confluent/dock…" 58 seconds ago Up 58 seconds 0.0.0.0:8081->8081/tcp schemaregistry
49616f1c9b0a confluentinc/cp-kafka:latest "/etc/confluent/dock…" 59 seconds ago Up 58 seconds 0.0.0.0:9092->9092/tcp, 0.0.0.0:19092->19092/tcp broker1
c9d19c82558d confluentinc/cp-zookeeper:latest "/etc/confluent/dock…" 59 seconds ago Up 59 seconds 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
- 项目
标签: