MessageBus/CommandBus模式的简单实现

pymessagebus的Python项目详细描述


PyMessageBus

python的消息/命令总线

"https://www://travis-ci.org/drbenton/pymessagebus"rel rel="nofollow"><<<<< >

PyMessageBus是消息总线库。它带有一个通用的MessageBus类,以及一个更专门的CommandBus类。

注意:这里的"消息总线"和"命令总线"指的是一种设计模式,与rabbitmq这样的消息传递系统无关。(即使它们可以一起使用)

我创建它是因为我在symfony应用程序上使用这种设计模式已经很多年了,我从来没有失望过——这是一种非常简单有效的方法,可以将业务操作与实现分离开来。

您可以查看以下URL以了解有关此设计模式的更多信息:

安装

啊!

概要

一个简单的例子,说明commandbus如何使业务操作(命令)与它们的效果实现(命令处理程序)保持分离:

啊!

API

消息总线

当 给定类型的消息在总线上发送。
结果是一个结果数组,其中每个项都是一个处理程序执行的结果。 啊!

因此,这个api非常简单(您可以在api模块中看到它是一个抽象类:

  • 添加处理程序(message u class:type,message u handler:t.callable)>;无添加一个处理程序,当向其发送此类消息时,该处理程序将由总线实例触发。
  • 句柄(消息:对象)>;t.list[t.any]触发以前为该消息类注册的句柄。如果没有为此类邮件注册处理程序,则返回空列表。
  • 具有处理程序(消息类:类型)>;bool只允许一个检查i为给定的消息类注册了一个或多个处理程序。

命令总线

commandbus是一个特殊版本的消息总线(从技术上讲,它只是消息总线上的一个代理,它增加了对这些特殊性的管理),具有以下微妙之处:

  • 对于给定的消息类,只能注册一个处理程序
  • 当通过handle方法将消息发送到总线时,如果没有为此消息类注册处理程序,则会引发错误。

简而言之,命令总线假设对于我们发送给它的每一个业务操作触发的处理程序都是必需的—只有一个。

因此,API与MessageBus完全相同,但有以下技术差异:

  • 如果试图为以前已为其注册了另一个处理程序的消息类注册处理程序,则add\u handler(message\class,handler)方法将引发api.commandhandleralreadyregistedforatype异常。
  • handle(message)方法返回单个结果,而不是结果列表(因为对于给定的消息类,我们可以而且必须只有一个处理程序)。如果没有为此消息类注册任何处理程序,则会引发一个api.commandhandlernotfound异常。
commandbus的附加选项

CommandBus构造函数有其他选项,您可以使用这些选项自定义其行为:

  • 允许结果:当类被实例化时,可以使用允许结果=true命名参数(默认值为false),从而对commandbus模式的实现更加严格。
    在这种情况下,句柄(消息)的结果将始终是none。通过这样做,我们可以遵循更纯粹的设计模式。(并通过应用程序存储库访问命令处理的结果,例如,将预先生成的id附加到消息中)
  • 锁定:默认情况下,如果在仍处理另一条消息的情况下向CommandBus发送消息(如果其中一个命令处理程序向总线发送消息,则可能发生此情况),CommandBusalReadyProcessingMessage将引发API异常。
    可以通过设置命名参数locking=false(默认值为true)来禁用此行为。

中间产品

最后但并非最不重要的是,这两种巴士都可以接受中间商。

中间件是接收消息(发送到总线)作为其第一个参数的函数,以及作为第二个参数的"下一个中间件"函数。该函数可以在触发下一个中间件(或为此类消息注册的处理程序的执行)之前或/之后执行一些自定义处理。

中间产品以"洋葱形状"触发:在两个中间产品的情况下,例如:

  • 第一个注册的中间件"预处理"将首先执行
  • 第二个在
  • 然后执行为该消息类注册的处理程序(它是洋葱的核心)

然后我们从洋葱里出来,方向相反:

  • 第二个中间件"后处理"发生
  • 触发第一个中间件"后处理"
  • 如果最终返回结果

middleware可以更改发送到下一个middleware(或消息处理程序)的消息,但它们也可以执行一些不影响消息的处理(例如日志记录)。

下面是一段说明:

classMessageWithList(t.NamedTuple):payload:t.List[str]defmiddleware_one(message:MessageWithList,next:api.CallNextMiddleware):message.payload.append("middleware one: does something before the handler")result=next(message)message.payload.append("middleware one: does something after the handler")returnresultdefmiddleware_two(message:MessageWithList,next:api.CallNextMiddleware):message.payload.append("middleware two: does something before the handler")result=next(message)message.payload.append("middleware two: does something after the handler")returnresultdefhandler(message:MessageWithList)->str:message.payload.append("handler does something")return"handler result"message_bus=MessageBus(middlewares=[middleware_one,middleware_two])message_bus.add_handler(MessageWithList,handler)message=MessageWithList(payload=["initial message payload"])result=sut.handle(message)assertmessage.payload==["initial message payload","middleware one: does something before the handler","middleware two: does something before the handler","handler does something","middleware two: does something after the handler","middleware one: does something after the handler",]assertresult=="handler result"

日志中间件

为了方便起见,软件包附带了一个"日志"中间件。

概要

importloggingfrompymessagebus.middleware.loggerimportget_logger_middlewarelogger=logging.getLogger("message_bus")logging_middleware=get_logger_middleware(logger)message_bus=MessageBus(middlewares=[logging_middleware])# Now you will get logging messages:#  - when a message is sent on the bus (default logging level: DEBUG)#  - when a message has been successfully handled by the bus, with no Exception raised (default logging level: DEBUG)#  - when the processing of a message has raised an Exception (default logging level: ERROR)

您可以定制中间件vi的日志级别aLoggingMiddleWareConfig类:

importloggingfrompymessagebus.middleware.loggerimportget_logger_middleware,LoggingMiddlewareConfiglogger=logging.getLogger("message_bus")logging_middleware_config=LoggingMiddlewareConfig(mgs_received_level=logging.INFO,mgs_succeeded_level=logging.INFO,mgs_failed_level=logging.CRITICAL)logging_middleware=get_logger_middleware(logger,logging_middleware_config)

"默认"单例

因为这些总线的大多数用例都依赖于总线的单个实例,对于Commodity,您还可以对MessageBus和CommandBus使用单例,可以从"默认"子包访问它们。

这些版本还公开了一个非常方便的register\u处理程序(消息类:type)decorator。

概要:

# domain.pyimporttypingastclassCreateCustomerCommand(t.NamedTuple):first_name:strlast_name:str# command_handlers.pyfrompymessagebus.defaultimportcommandbusimportdomain@commandbus.register_handler(domain.CreateCustomerCommand)defhandle_customer_creation(command)->int:customer=OrmCustomer()customer.full_name=f"{command.first_name}{command.last_name}"customer.creation_date=datetime.now()customer.save()returncustomer.id# api.pyfrompymessagebus.defaultimportcommandbusimportdomain@post("/customer)defpost_customer(params):# Note that the implmentation (the "handle_customer_creation" function)# is completely invisible here, we only know about the (agnostic) CommandBus# and the class that describe the business action (the Command)command=CreateCustomerCommand(params["first_name"],params["last_name"])customer_id=command_bus.handle(command)returncustomer_id

您可以注意到,与第一个概要的不同之处在于,在这里我们不必实例化commandbus,并且使用decorator自动向它注册handle_customer_creation函数。

代码质量

代码本身用黑色格式化,并用pylint和mypy检查。

整个软件包附带一个完整的测试套件,由pytest管理。

$ make test

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java泛型重写抽象方法并具有子类的返回类型   Java中的字符串反转字符,同时保留一些字符   java将系统时间与我获取它的时间进行比较   java解析ODATA URL以在准备entityset之前读取ID值   java中的有界通配符下界泛型即使在传递超类时也不会编译   c#Java的JVM和Java的内部工作方式有什么不同。NET的CLR?   java如何在windows7上指定JDK的版本?   Java:列出单个目录中的所有文件(1020000+)   java使用Logback和Lombok   安卓谷歌玩java。lang.NullPointerException   使用RSA的解密结果在普通Java和Android中有所不同   具有默认连接池的java Spring引导   java我如何在一个坏的测试环境中前进?