bspump是python 3.5的实时流处理器+

bspump的Python项目详细描述


Join the chat at https://gitter.im/TeskaLabs/bspump

原理

  • 写一次,多次使用
  • 一切都是一条小溪
  • 无模式
  • 卡帕建筑
  • 实时
  • 高性能
  • 简单易用且文档齐全,因此任何人都可以编写自己的流处理器
  • 通过python 3.5+async/awaitasyncio
  • 异步
  • Event driven Architecture/Reactor pattern
  • 单线程内核,但与线程兼容
  • pypy兼容,能够将python代码性能提高5倍以上的即时编译器
  • python生态系统的好公民
  • 模块化

流处理器示例

#!/usr/bin/env python3importbspumpimportbspump.socketimportbspump.commonimportbspump.elasticsearchclassMyPipeline(bspump.Pipeline):def__init__(self,app):super().__init__(app)self.build(bspump.socket.TCPStreamSource(app,self),bspump.common.JSONParserProcessor(app,self),bspump.elasticsearch.ElasticSearchSink(app,self,"ESConnection"))if__name__=='__main__':app=bspump.BSPumpApplication()svc=app.get_service("bspump.PumpService")svc.add_connection(bspump.elasticsearch.ElasticSearchConnection(app,"ESConnection"))svc.add_pipeline(MyPipeline(app))app.run()

视频教程

http://img.youtube.com/vi/QvjiPxO4w6w/0.jpg

空白应用程序设置

您可以从it’s own repository克隆空白应用程序。

可用技术

  • bspump.amqpamqp/rabbitmq连接,源和汇
  • bspump.avroapache avro文件源和汇
  • bspump.common通用处理器和分析器
  • bspump.elasticsearchelasticsearch连接、源和汇
  • bspump.file文件源和接收器(普通文件、json、csv)
  • bspump.filter内容、属性和时间漂移过滤器
  • bspump.http.clienthttp客户端源,websocket客户端接收器
  • bspump.http.webhttp服务器源和接收器,websocket服务器源
  • bspump.influxdbinfloxdb连接和接收器
  • bspump.kafkakafka连接,源和汇
  • bspump.mailsmtp连接和接收器
  • bspump.mongodbmongodb连接和查找
  • bspump.mysqlmysql连接,源和汇
  • bspump.parquetapache parquet文件接收器
  • bspump.postgresqlpostgresql连接和接收器
  • bspump.slack松弛连接和接收器
  • bspump.sockettcp源,udp源
  • bspump.trigger机会触发器、pubsub触发器和周期触发器
  • bspump.crypto加密
    • 散列:sha224、sha256、sha384、sha512、sha1、md5、blake2b、blake2s
    • 对称加密:AES 128、AES 192、AES 256
  • bspump.analyzer
    • 时间窗口分析器
    • 会话分析器
    • 地理分析仪
    • 时间漂移分析仪
  • bspump.lookup
    • 地理IP查找
  • bspump.unittest
    • 用于测试处理器/管道的接口
  • bspump.oob带外接收器和引擎
  • bspump.webpump管道、查找等的api端点。

谷歌技术兼容矩阵表: https://docs.google.com/spreadsheets/d/1L1DvSuHuhKUyZ3FEFxqEKNpSoamPH2Z1ZaFuHyageoI/edit?usp=sharing

高级体系结构

Schema of BSPump high-level achitecture

单元测试

fromunittest.mockimportMagicMockfrombspump.unittestimportProcessorTestCaseclassMyProcessorTestCase(ProcessorTestCase):deftest_my_processor(self):# setup processor for testself.set_up_processor(my_project.processor.MyProcessor)# mock methods to suit your needs on pipeline ..self.Pipeline.method=MagicMock()# .. or instance of processormy_processor=self.Pipeline.locate_processor("MyProcessor")my_processor.method=MagicMock()output=self.execute([(None,{'foo':'bar'})]# Context, event)# assert outputself.assertEqual([eventforcontext,eventinoutput],[{'FOO':'BAR'}])# asssert expected calls on `self.Pipeline.method` or `my_processor.method`my_processor.method.assert_called_with(**expected)

单元测试的运行

python3 -m unittest test

可以用单元测试模块的位置替换test

许可证

bspump是一个开源软件,根据bsd 3条款许可证提供。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Selenium Web驱动程序。负载策略不稳定   JAVAsql。SQLException:没有合适的驱动程序无法创建“oracle”类的JDBC驱动程序。jdbc。驾驶员用于连接URL的OracleDriver   java谷歌地图“优化路径点”如何解决旅行推销员的问题?   java这段代码可能会造成潜在的内存泄漏吗?   java更改JCombobox的高度   java如何清除jtable中的数据   JavaHadoop:错误安全性。UserGroupInformation:MapReduce程序中的PriviledgedActionException   java如何将sonarlint添加到gradle任务中?   性能为什么Java排序优于原语计数排序   java Spring属性PlaceHolderConfigure从数据库加载   java从泛型获取类不起作用   java Spring@Transactional传播属性   java试图拥有主菜单和子菜单类   XSL的java后处理步骤   java第一个字母和最后一个字母、第二个字母和倒数第二个字母之间的差值之和,依此类推,直到单词的中心   switch语句在Java中的下一个出发点   java 安卓如何在OnBackpress()时设置viewpager的特定项目位置?