一种流处理的流水线框架
tanbih-pipeline的Python项目详细描述
支持RabbitMQ、Pulsar、Kafka和Redis的灵活流处理框架。在
特点
- 每封邮件至少有一次确认
- 通过消费者群体横向扩展
- 在部署中控制流程,开发一次,在任何地方使用它
- 提供文件和内存输入/输出的可测试性
参数
- kind-指定管道的下划线技术,例如KAFKA或RabbitMQ
- MEM—基于内存的队列(适用于unittests)
- 基于文件的队列(适用于开发和集成测试)
发电机
生成器将在我们的管道中开发数据源时使用。消息来源 会产生没有输入的输出。爬虫可以看作是发电机。在
>>>frompipelineimportGenerator,Message>>>>>>classMyGenerator(Generator):...defgenerate(self):...foriinrange(10):...yield{'id':i}>>>>>>generator=MyGenerator('generator','0.1.0',description='simple generator')>>>generator.parse_args("--kind MEM --out-topic test".split())>>>generator.start()>>>[r.get('id')forringenerator.destination.results][0,1,2,3,4,5,6,7,8,9]
处理器
处理器用于处理输入。将进行修改。处理器 可以为每个输入产生一个输出,或者没有输出。在
^{pr2}$分路器
当写入多个输出时,将使用拆分器。它需要一个函数 根据处理消息生成输出主题,并在编写输出时使用它。在
>>>frompipelineimportSplitter,Message>>>>>>classMySplitter(Splitter):...defget_topic(self,msg):...return'{}-{}'.format(self.destination.topic,msg.get('id'))......defprocess(self,msg):...msg.update({...'processed':True,...})...returnNone>>>>>>splitter=MySplitter('splitter','0.1.0',description='simple splitter')>>>config={'data':[{'id':1}]}>>>splitter.parse_args("--kind MEM --in-topic test --out-topic test".split(),config=config)>>>splitter.start()>>>[r.get('id')forrinsplitter.destinations['test-1'].results][1]
使用
选择要从中生成子类的生成器、处理器或拆分器。在
环境变量
应用程序接受以下环境变量:
environment variable | command line argument | options |
---|---|---|
PIPELINE | –kind | KAFKA, PULSAR, FILE |
PULSAR | –pulsar | pulsar url |
TENANT | –tenant | pulsar tenant |
NAMESPACE | –namespace | pulsar namespace |
SUBSCRIPTION | –subscription | pulsar subscription |
KAFKA | –kafka | kafka url |
GROUPID | –group-id | kafka group id |
INTOPIC | –in-topic | topic to read |
OUTTOPIC | –out-topic | topic to write to |
自定义代码
定义add_arguments以向worker添加新参数。在
定义安装程序以在辅助进程开始处理消息之前运行初始化代码。安装程序在之后调用 已分析命令行参数。这里是基于选项(解析参数)的逻辑。在
选项
错误
如果dct或dcts为空,则应返回上面的值None。 错误将被发送到主题errors,其中包含worker信息。在
贡献
使用预提交运行black和flake8
学分
张一凡(yzhang at哈佛商学院教育质量保证)在
- 项目
标签: