基于流的异步编程

aiopype的Python项目详细描述


所有类型

python异步数据管道

aiopype允许使用 简单明了的开发方法。

aiopype创建一个集中的消息处理程序,以允许 处理器作为独立的非阻塞消息工作 生产者/消费者。

aiopype有4个主要概念:

  • 流量
  • 经理
  • 处理器
  • 消息处理程序

流量

流是aiopype的主要组件。流程是 运行管道管理器的可靠性。

Flow负责:

  • 启动所有注册经理
  • 处理管理器故障
  • 报告错误
  • 重新启动失败的管理器

经理

管理器负责注册从顶部到 底部。这意味着它必须注册一个源并连接到 消费者,直到管道最终输出。

处理器

处理器是消息的使用者/生产者。

来源

源是处理器的特殊情况。他们的特点是 它们可以永远运行,是任何管道的起点。

源的示例可能是:

  • 一个REST API轮询器
  • 一个Websocket客户机
  • 一个Cron作业

消息处理程序

消息处理程序是允许aiopype 规模。

流将以一个或多个源作为每个源的起点 注册经理。一旦源生成事件,消息将 触发后,处理程序将识别并触发相应的 处理程序。

有两个可用的消息处理程序:

  • 同步协议
  • 异步协议

同步协议

同步事件处理程序,顾名思义,是同步的, 这意味着一旦源发出消息,就必须在 管道的末端和源头可以继续正常工作 行为。这有利于开发,但不能满足 允许组件所需的异步事件驱动模式 孤立。

异步协议

SyncProtocol和AsyncProtocol的主要区别在于 后者使用分离的事件循环来评估是否有新消息 在等待处理的队列中,而第一个简单地开始处理 即时收到消息。这允许完全隔离 处理器。

示例

苹果股票处理器。

来源

我们的数据源是Yahoo Finance,用于从^{tt6}收集数据$ 股票价格。我们将使用aiopypeRestSource作为基类。

fromaiopype.sourcesimportRestSourceclassYahooRestSource(RestSource):"""
  Yahoo REST API source.
  """def__init__(self,name,handler,symbol):super().__init__(name,handler,'http://finance.yahoo.com/webservice/v1/symbols/{}/quote?format=json&view=detail'.format(symbol),{'exception_threshold':10,'request_interval':30})

处理器

我们的样本处理程序将从返回的 杰森。

fromaiopypeimportProcessorclassHandleRawData(Processor):defhandle(self,data,time):self.emit('price',time,data['list']['resources'][0]['resource']['fields']['price'])

输出

我们的输出处理器将价格数据写入csv文件。

importcsvclassCSVOutput(Processor):def__init__(self,name,handler,filename):super().__init__(name,handler)self.filename=filenamewithopen(self.filename,'w',newline='')ascsvfile:writer=csv.writer(csvfile,delimiter=';')writer.writerow(['time','price'])defwrite(self,time,price):withopen(self.filename,'w',newline='')ascsvfile:writer=csv.writer(csvfile,delimiter=';')writer.writerow([time,price])

经理

管理器将实例化SourceProcessorOutput。 它将把Sourcedata事件连接到Processor.handle 处理程序和Processor'sprice事件到Output.write处理程序。 这将是我们的数据管道。

fromaiopypeimportManagerclassYahooManager(Manager):name='yahoo_apple'def__init__(self,handler):super().__init__(handler)self.processor=HandleRawData(self.build_processor_name('processor'),self.handler)self.source=YahooRestSource(self.build_processor_name('source'),self.handler,'AAPL')self.writer=CSVOutput(self.build_processor_name('writer'),self.handler,'yahoo_appl.csv')self.source.on('data',self.processor.handle)self.processor.on('price',self.writer.write)

流量

我们的流配置将只有yahoo_apple管理器。

fromaiopypeimportAsyncFlowclassFlowConfig(object):FLOWS=['yahoo_apple']dataflow=AsyncFlow(FlowConfig())

主要方法:

只需启动数据流。

if__name__=="__main__":dataflow.start()

运行示例

在名为example.py的文件中编译上述所有代码并运行:

python example.py

在制品:

这种分散的机制使得分布式管道成为可能, 如果我们有协调人在节点之间。

变更日志

0.1.4/2016-07-14

  • #10避免未完成 流量(@jalpedrinha)

0.1.3/2016-07-11

  • #8修复异步协议 终止条件(@jalpedrinha)

0.1.2/2016-07-06

  • #6处理异常 来自异步协议侦听器(@jalpedrinha)

0.1.1/2016-07-05

  • #4避免失败 Pusher客户端断开(@jalpedrinha)

0.1.0/2016-07-05

  • #1添加流管理器 和处理器(@jalpedrinha)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Android首选项相同getPreference()值的不同数据   字符串如何在Java中连接两个列表中的单词   安卓 Java Socket编程:检测客户端和服务器是否连接   使用JTextField的java无法将值转换为int   在java中,如何在不通过超级构造函数设置的情况下将消息设置为自定义异常类   用于标识属性值的java正则表达式模式   Android中的java不可见谷歌地图   java正确取消启动ExecutorService的JavaFX任务   在非活动java类中使用安卓的融合位置提供程序,并在主活动类中获取经度和纬度   spring为什么我的大摇大摆不能用springboot在java中工作?   java JSF(2.2)ViewScope在Weblogic 12.2.1.2和JDK 8上使用Spring 4.3.7(在Mac OS和Docker Oracle Linux环境上)   java如何用一个按钮提高计时器速度   java如何检查字符是否是元音?   注册表引用了不存在的Java运行时环境安装或运行时已损坏错误   来自ResultSet java的mysql getDateTime   maven LanguageTool Java API是否具有“无用”依赖关系?   twitter api身份验证的java Trycatch问题   java在Apache Struts 1.1中是否可以显式白名单?   安卓致命异常:主java。lang.RuntimeException:被问及未知片段