使用RabbitMQ等代理构建管道的消息交换引擎

barterdude的Python项目详细描述


调酒师

Build StatusCoverage Status

消息交换引擎使用RabbitMQ这样的代理来构建管道。这个项目是建立在伟大的async-worker之上的。在

barter

安装

使用Python3.6+

pip install barterdude

使用

使用以下完整示例构建您的消费者:

^{pr2}$

建立自己的钩子

底座挂钩(简单型)

这些钩子在消息检索、成功和失败时调用。在

frombarterdude.hooksimportBaseHookfromasyncworker.rabbitmq.messageimportRabbitMQMessageclassMyCounterHook(BaseHook):_consume=_fail=_success=0asyncdefon_success(self,message:RabbitMQMessage):self._success+=1asyncdefon_fail(self,message:RabbitMQMessage,error:Exception):self._fail+=1asyncdefbefore_consume(self,message:RabbitMQMessage):self._consume+=1

Http Hook(开放路由)

这些钩子可以完成简单钩子所做的所有事情,但对路由做出响应。在

fromaiohttpimportwebfrombarterdude.hooksimportHttpHookfromasyncworker.rabbitmq.messageimportRabbitMQMessageclassMyCounterHttpHook(HttpHook):_consume=_fail=_success=0asyncdef__call__(self,req:web.Request):returnweb.json_response(dict(consumed=self._consume,success=self._success,fail=self._fail))asyncdefon_success(self,message:RabbitMQMessage):self._success+=1asyncdefon_fail(self,message:RabbitMQMessage,error:Exception):self._fail+=1asyncdefbefore_consume(self,message:RabbitMQMessage):self._consume+=1

数据共享

遵循async-workeraiohttp中的方法, BarterDude不鼓励使用全局变量,也就是单例变量。在

要在应用程序中全局共享数据状态,BarterDude的行为类似于dict。 例如,可以在BarterDude实例中保存类似全局的变量:

frombarterdudeimportBarterDudebarterdude=BarterDude()baterdude["my_variable"]=data

把它带回消费者那里

asyncdefconsumer_access_storage(msg):data=baterdude["my_variable"]

模式验证

使用的消息可以通过json架构进行验证:

@barterdude.consume_amqp(["queue1","queue2"],monitor,validation_schema={"$schema":"http://json-schema.org/draft-04/schema#","$id":"http://example.com/example.json","type":"object","title":"Message Schema","description":"The root schema comprises the entire JSON document.","additionalProperties":True,"required":["key"],"properties":{"key":{"$id":"#/properties/key","type":"string","title":"The Key Schema","description":"An explanation about message.","default":""}}},requeue_on_validation_fail=False# invalid messages are removed without requeue)

您仍然可以在生成消息之前或在需要时验证消息:

frombarterdude.messageimportMessageValidationvalidator=MessageValidation(json_schema)validator.validate(message)

数据保护

Barterdude考虑了GDPR数据保护,默认情况下不记录消息体,但是您可以使用环境变量BARTERDUDE_LOG_REDACTED=0停用它

现在消息体将被日志钩子记录。在

此配置只控制BarterDude的默认日志挂钩,对用户自定义用户日志没有影响。如果要使用此配置控制日志,请使用:

frombaterdude.confimportBARTERDUDE_LOG_REDACTED

测试

为了测试异步使用者,我们建议使用asynctestlib

fromasynctestimportTestCaseclassTestMain(TestCase):deftest_should_pass(self):self.assertTrue(True)

我们希望你喜欢!:传情动漫:

贡献

对于开发和贡献,请遵循Contributing GuideALWAYS尊重Code of Conduct

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
SpringWeb中的java更新/通知其他用户   java Lambda性能测试   java Bukkit插件:空白符号   java在按下按钮后改变彩色正方形的大小   javajavac相当于“D”?   java序列化接口   属性无法从属性文件返回值   java我应该使用什么查询来使用Jsoup从html页面提取符号?   java Android Studio项目结构问题   JAVA方法和返回值/公共变量(基础)   java将NativeQuery映射到POJO   java如何在下面的程序中消除NumberFormatException?   在java中获取链表与数组中的对象   java Android Firebase将用户发送到聊天室