没有项目描述

buslane的Python项目详细描述


公共汽车道

Build StatusLicenseVersionPython versions

简单消息(事件/命令)总线。

安装

要安装buslane,只需使用pip(或pipenv):

$ pip install buslane

要求

buslane支持的最低python版本是3.6。

快速启动

fromdataclassesimportdataclassfrombuslane.commandsimportCommand,CommandHandler,CommandBus@dataclassclassRegisterUserCommand(Command):email:strpassword:strclassRegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):defhandle(self,command:RegisterUserCommand)->None:assertcommand==RegisterUserCommand(email='john@lennon.com',password='secret',)command_bus=CommandBus()command_bus.register(handler=RegisterUserCommandHandler())command_bus.execute(command=RegisterUserCommand(email='john@lennon.com',password='secret',))

关于

此库使基于消息创建解决方案更加容易。如果要将事件发生与 处理,buslane是一种方法。它支持命令(单个处理程序)和事件(0或多个处理程序) 接近。

动机

这个包可能会被一个简单的python字典所替代,该字典使用消息类作为键,并使用普通的 作为值使用。python是一种动态语言,我们可以很容易地实现这样的解决方案,不需要任何类, 继承,方法重写等等。那你为什么要用buslane?是不是pythonic方法?

首先,buslane是一个非常简单和微小的项目。我把这几行从一个项目复制到另一个项目,所以现在我 不用了。

其次,我非常喜欢类型注释。这是一个具有巨大代码库的项目中的游戏改变者。buslane有 在任何地方输入提示,它都基于Python generics。与诸如 ^{}您将确保(从类型的角度)一切正常。

消息处理程序是类而不是函数,因为您可以通过__init__轻松地注入依赖项。 参数。这样的类很容易测试,您不需要mock.patch所有内容。界面清晰。

最后但并非最不重要的是,buslaneapi简单且定义良好。您可以轻松地扩展它,例如记录所有 消息或将其存储在数据库中。

它可以作为CQRS系统的基础。

参考

buslane使用python类型注释正确注册处理程序。要创建您必须继承的邮件 EventCommand类。处理程序应该继承自EventHandler[T]CommandHandler[T],其中T是一个类 你的信息。

事件

可以为单个事件注册多个或无处理程序。

课程:

  • Event
  • EventHandler[Event]
  • EventBus

例外情况:

  • WrongHandlerException

示例

frombuslane.eventsimportEvent,EventHandler,EventBusclassSampleEvent(Event):passclassSampleEventHandler(EventHandler[SampleEvent]):defhandle(self,event:SampleEvent)->None:passevent_bus=EventBus()event_bus.register(handler=SampleEventHandler())event_bus.publish(event=SampleEvent())

命令

您只需为给定的命令注册一个处理程序。

课程:

  • Command
  • CommandHandler[Command]
  • CommandBus

例外情况:

  • MissingCommandHandlerException
  • CommandHandlerRegisteredException
  • WrongHandlerException

示例

frombuslane.commandsimportCommand,CommandHandler,CommandBusclassSampleCommand(Command):passclassSampleCommandHandler(CommandHandler[SampleCommand]):defhandle(self,command:SampleCommand)->None:passcommand_bus=CommandBus()command_bus.register(handler=SampleCommandHandler())command_bus.execute(command=SampleCommand())

定制

如果要自定义总线的行为,可以从EventBus/CommandBus类重写handle方法。

下面的示例显示了一个总线,该总线记录每个事件并在线程中对其进行处理。

importloggingfromconcurrent.futuresimportThreadPoolExecutorclassCustomEventBus(EventBus):def__init__(self,workers:int)->None:super().__init__()self.logger=logging.getLogger()self.executor=ThreadPoolExecutor(max_workers=workers)defhandle(self,event:Event,handler:EventHandler)->None:self.logger.info(f'Handling event {event} by {handler}')self.executor.submit(handler.handle,event)

作者

Konrad Hałas创建。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java的目标是从我的项目中删除不起作用的文件   java对for循环的理解   java我完成了在作业要求的位置查找字符的部分,但是如何从我找到的字符串中删除字符呢?   基于帧时的java动画   java请求无效。缺少XGoogUploadCommand标头   java如何在viewsource模式下使用openStream?   grpc grpc_ARG_KEEPALIVE_PERMIT_而不调用java服务器?   java如何通过Junit测试Web服务调用   如何在java中获取鼠标中键?   使用junit在spring测试中加载属性文件   Java中用于类的类修饰符   java多色文本图像   sql无法调试Java中的“连接到数据库失败”异常   java如何指定hibernate连接映射?   java Android工具栏不显示   java仿射转换不同的图形对象   使用终端的java问题   Java在tomcat上查找127.0.0.1失败