基于流的异步编程

aiopype的Python项目详细描述


所有类型

python异步数据管道

aiopype允许使用 简单明了的开发方法。

aiopype创建一个集中的消息处理程序,以允许 处理器作为独立的非阻塞消息工作 生产者/消费者。

aiopype有4个主要概念:

  • 流量
  • 经理
  • 处理器
  • 消息处理程序

流量

流是aiopype的主要组件。流程是 运行管道管理器的可靠性。

Flow负责:

  • 启动所有注册经理
  • 处理管理器故障
  • 报告错误
  • 重新启动失败的管理器

经理

管理器负责注册从顶部到 底部。这意味着它必须注册一个源并连接到 消费者,直到管道最终输出。

处理器

处理器是消息的使用者/生产者。

来源

源是处理器的特殊情况。他们的特点是 它们可以永远运行,是任何管道的起点。

源的示例可能是:

  • 一个REST API轮询器
  • 一个Websocket客户机
  • 一个Cron作业

消息处理程序

消息处理程序是允许aiopype 规模。

流将以一个或多个源作为每个源的起点 注册经理。一旦源生成事件,消息将 触发后,处理程序将识别并触发相应的 处理程序。

有两个可用的消息处理程序:

  • 同步协议
  • 异步协议

同步协议

同步事件处理程序,顾名思义,是同步的, 这意味着一旦源发出消息,就必须在 管道的末端和源头可以继续正常工作 行为。这有利于开发,但不能满足 允许组件所需的异步事件驱动模式 孤立。

异步协议

SyncProtocol和AsyncProtocol的主要区别在于 后者使用分离的事件循环来评估是否有新消息 在等待处理的队列中,而第一个简单地开始处理 即时收到消息。这允许完全隔离 处理器。

示例

苹果股票处理器。

来源

我们的数据源是Yahoo Finance,用于从^{tt6}收集数据$ 股票价格。我们将使用aiopypeRestSource作为基类。

fromaiopype.sourcesimportRestSourceclassYahooRestSource(RestSource):"""
  Yahoo REST API source.
  """def__init__(self,name,handler,symbol):super().__init__(name,handler,'http://finance.yahoo.com/webservice/v1/symbols/{}/quote?format=json&view=detail'.format(symbol),{'exception_threshold':10,'request_interval':30})

处理器

我们的样本处理程序将从返回的 杰森。

fromaiopypeimportProcessorclassHandleRawData(Processor):defhandle(self,data,time):self.emit('price',time,data['list']['resources'][0]['resource']['fields']['price'])

输出

我们的输出处理器将价格数据写入csv文件。

importcsvclassCSVOutput(Processor):def__init__(self,name,handler,filename):super().__init__(name,handler)self.filename=filenamewithopen(self.filename,'w',newline='')ascsvfile:writer=csv.writer(csvfile,delimiter=';')writer.writerow(['time','price'])defwrite(self,time,price):withopen(self.filename,'w',newline='')ascsvfile:writer=csv.writer(csvfile,delimiter=';')writer.writerow([time,price])

经理

管理器将实例化SourceProcessorOutput。 它将把Sourcedata事件连接到Processor.handle 处理程序和Processor'sprice事件到Output.write处理程序。 这将是我们的数据管道。

fromaiopypeimportManagerclassYahooManager(Manager):name='yahoo_apple'def__init__(self,handler):super().__init__(handler)self.processor=HandleRawData(self.build_processor_name('processor'),self.handler)self.source=YahooRestSource(self.build_processor_name('source'),self.handler,'AAPL')self.writer=CSVOutput(self.build_processor_name('writer'),self.handler,'yahoo_appl.csv')self.source.on('data',self.processor.handle)self.processor.on('price',self.writer.write)

流量

我们的流配置将只有yahoo_apple管理器。

fromaiopypeimportAsyncFlowclassFlowConfig(object):FLOWS=['yahoo_apple']dataflow=AsyncFlow(FlowConfig())

主要方法:

只需启动数据流。

if__name__=="__main__":dataflow.start()

运行示例

在名为example.py的文件中编译上述所有代码并运行:

python example.py

在制品:

这种分散的机制使得分布式管道成为可能, 如果我们有协调人在节点之间。

变更日志

0.1.4/2016-07-14

  • #10避免未完成 流量(@jalpedrinha)

0.1.3/2016-07-11

  • #8修复异步协议 终止条件(@jalpedrinha)

0.1.2/2016-07-06

  • #6处理异常 来自异步协议侦听器(@jalpedrinha)

0.1.1/2016-07-05

  • #4避免失败 Pusher客户端断开(@jalpedrinha)

0.1.0/2016-07-05

  • #1添加流管理器 和处理器(@jalpedrinha)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java流/下载文件,无需在Spring Boot中保留内存   kotlin/java是否有类似TryParse()的东西?   java Spring引导找不到用户类型的属性ID   java Spring 2到Spring 3的迁移:一个控制器用于多个视图   java包含多个程序包名为“安卓”的库。支持图样可提取'   java spring JPA是否支持自定义值框架?   java转义出基于文本字段的搜索栏   java AAPT:错误:未找到样式属性“安卓:attr/WindowsPlashCreenBackground”   java从文本文件中读取纬度和经度   java哪里可以找到如何使用排序的示例。顺序ignorecase(),其中nullhandling nulls last用于自定义Spring JPA如何提供查询   尝试使用Dialogflow上的Webhook动态给出响应时,java获取Webhook响应错误(206)   如何在java中替换匹配的字符串?   java模拟数学的最大价值。随机的   java Spring RestTemplate GET请求未给出正确响应   春爪哇。lang.IllegalArgumentException   java系统。load()永远不会发生   java剪辑循环不工作   java如何从maven构建中删除staxapi   java ThreadPoolExecutor的排队行为是否可以自定义,以更喜欢创建新线程而不是排队?