结构Python消息总线库

fabric-message-bus的Python项目详细描述


消息总线模式和消息定义

结构消息总线模式的基本框架和参与者间通信的消息

概述

使用apachekafka实现了控制和测量框架中不同参与者之间的结构通信。在

apachekafka是一个为流而设计的分布式系统。它被构建为容错、高吞吐量、水平扩展,并允许地理分布的数据流和流处理应用程序。在

Kafka支持各种参与者/服务的事件驱动实现。事件既是事实又是触发因素。每个fabric参与者都将是一个主题的制作人,遵循单作者原则,并订阅其他参与者的主题进行交流。消息通过Kafka使用Apache Avro数据序列化系统进行交换。在

要求

  • Python 3.7+
  • 融合的卡夫卡
  • 融合的卡夫卡

安装

$ pip3 install .

使用

这个包实现了生产者/消费者api的接口,以通过Avro序列化向Kafka推送/读取消息。在

消息和模式

用户应继承IMessage类(消息.py)定义它自己的成员并覆盖到_dict()函数。它还需要定义与派生类相关的相应AVRO模式。这种新的模式应该在生产者和消费者中使用。在

基本IMessage类的示例模式在(schema)中可用/消息.avsc)在

生产者

AvroProducerApi类实现avrokafka生产者的基本功能。用户应继承此类并重写delivery_report方法以处理异步生成的消息传递。在

在结束时可用的用法示例producer.py在

消费者

AvroConsumerApi类实现avrokafka消费者的基本功能。用户应继承此类并重写process\u message方法来处理传入消息的消息处理。在

在结束时可用的用法示例消费者.py在

管理API

AdminApi类提供对执行基本管理功能(如创建/删除主题/分区等)的支持

如何启动一个测试Kafka集群进行测试

生成凭证

您必须生成CA证书(如果已经有CA证书,则使用您的证书),然后为代理和客户机生成密钥库和信任库。在

^{pr2}$

设置secrets目录的环境变量。这在以后的命令中使用。确保您在MessageBus目录中。在

export KAFKA_SSL_SECRETS_DIR=$(pwd)/secrets

打开容器

你可以使用docker-合成.yaml文件来启动一个包含

  • 经纪人
  • 动物园管理员
  • 架构注册表

使用下面的命令启动集群

docker-compose up -d

这将产生以下容器:

docker ps
CONTAINER ID        IMAGE                                    COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES
189ba0e70b97        confluentinc/cp-schema-registry:latest   "/etc/confluent/dock…"   58 seconds ago      Up 58 seconds       0.0.0.0:8081->8081/tcp                                                                       schemaregistry
49616f1c9b0a        confluentinc/cp-kafka:latest             "/etc/confluent/dock…"   59 seconds ago      Up 58 seconds       0.0.0.0:9092->9092/tcp, 0.0.0.0:19092->19092/tcp                                             broker1
c9d19c82558d        confluentinc/cp-zookeeper:latest         "/etc/confluent/dock…"   59 seconds ago      Up 59 seconds       2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp                                                   zookeeper

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java无法使用JSF访问托管bean方法   java是制作具有多值类型的HashMap的正确方法   javafx中TicTacToe的java更新UI   windows Java文件。getCanonicalFile()无法处理冒号“:”   java在一个布局屏幕中创建多个(26)按钮   java Android Studio:Gradle构建完成,有251个错误   我们如何在Java上为callfireapiclient编写单元/集成测试?   java无法将1715UTC转换为本地/gmt类型   具有已定义的数字序列的JAVA循环   Java程序正在netbeans中编译,但未在CMD中编译,包不存在   java Android构造函数和onCreate()之间有什么区别?   java配置弹性搜索结果评分   java LibGDX纹理是否可绘制?   java如何在Android中设置应用程序默认打开pdf   java是否有一种创造性的方法将多个参数传递给contentEquals()方法?   java在Android上存储Ed25519私钥