一种流处理的流水线框架

tanbih-pipeline的Python项目详细描述


https://badge.fury.io/py/tanbih-pipeline.svgDocumentation Status

支持RabbitMQ、Pulsar、Kafka和Redis的灵活流处理框架。在

特点

  • 每封邮件至少有一次确认
  • 通过消费者群体横向扩展
  • 在部署中控制流程,开发一次,在任何地方使用它
  • 提供文件和内存输入/输出的可测试性

参数

  • kind-指定管道的下划线技术,例如KAFKA或RabbitMQ
  • MEM—基于内存的队列(适用于unittests)
  • 基于文件的队列(适用于开发和集成测试)

发电机

生成器将在我们的管道中开发数据源时使用。消息来源 会产生没有输入的输出。爬虫可以看作是发电机。在

>>>frompipelineimportGenerator,Message>>>>>>classMyGenerator(Generator):...defgenerate(self):...foriinrange(10):...yield{'id':i}>>>>>>generator=MyGenerator('generator','0.1.0',description='simple generator')>>>generator.parse_args("--kind MEM --out-topic test".split())>>>generator.start()>>>[r.get('id')forringenerator.destination.results][0,1,2,3,4,5,6,7,8,9]

处理器

处理器用于处理输入。将进行修改。处理器 可以为每个输入产生一个输出,或者没有输出。在

^{pr2}$

分路器

当写入多个输出时,将使用拆分器。它需要一个函数 根据处理消息生成输出主题,并在编写输出时使用它。在

>>>frompipelineimportSplitter,Message>>>>>>classMySplitter(Splitter):...defget_topic(self,msg):...return'{}-{}'.format(self.destination.topic,msg.get('id'))......defprocess(self,msg):...msg.update({...'processed':True,...})...returnNone>>>>>>splitter=MySplitter('splitter','0.1.0',description='simple splitter')>>>config={'data':[{'id':1}]}>>>splitter.parse_args("--kind MEM --in-topic test --out-topic test".split(),config=config)>>>splitter.start()>>>[r.get('id')forrinsplitter.destinations['test-1'].results][1]

使用

选择要从中生成子类的生成器、处理器或拆分器。在

环境变量

应用程序接受以下环境变量:

environment variablecommand line argumentoptions
PIPELINE–kindKAFKA, PULSAR, FILE
PULSAR–pulsarpulsar url
TENANT–tenantpulsar tenant
NAMESPACE–namespacepulsar namespace
SUBSCRIPTION–subscriptionpulsar subscription
KAFKA–kafkakafka url
GROUPID–group-idkafka group id
INTOPIC–in-topictopic to read
OUTTOPIC–out-topictopic to write to

自定义代码

定义add_arguments以向worker添加新参数。在

定义安装程序以在辅助进程开始处理消息之前运行初始化代码。安装程序在之后调用 已分析命令行参数。这里是基于选项(解析参数)的逻辑。在

选项

错误

如果dctdcts为空,则应返回上面的值None。 错误将被发送到主题errors,其中包含worker信息。在

贡献

使用预提交运行blackflake8

学分

张一凡(yzhang at哈佛商学院教育质量保证)在

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何从数组中打印int值?   prepared语句Java中奇怪的异常PreparedStatement:参数索引超出范围   封装如何在OOP Java中为主方法编码?   java某些手机显示快捷方式徽章需要什么权限?   java TextView不会随OnItemSelectedListener更改   java注释处理器不会自动触发吗?   java Spring JPA如何计算外键数   c#对于这个简单的OOAD问题,哪种设计最优雅?   java如何处理while循环中的异常?   Android Studio错误:进程'command'/home/draven/Downloads/安卓studio/jre/bin/java''结束,退出值为非零2   在使用Payara服务器的Vaadin应用程序中导航到根目录时,java将丢失上下文根目录   使用contentType application/json而不是application/jsonpatch+json的java修补程序   带有tomcat的java HAproxy连接不足   Java:在应用过滤器后创建一个简单的通用方法进行计数   java如何使用多态性创建一个实例化对象的方法,然后用它们高效地填充ArrayList?