使用aws lambda函数和kinisis的无服务器工作流体系结构

xFlow的Python项目详细描述



xflow
=
使用aws lambda函数和kinesis的无服务器工作流体系结构。



-除了不将
输出作为输入输入输入外,上一步将发布下一步订阅的事件。这样,每个步骤都可以独立执行并并行运行。

w使用aws kinisis作为流媒体引擎来发布和订阅工作流中的事件。
如上所述,这些事件在订阅后将通过aws lambda函数进行处理。您提供给xflow的配置文件将是glue
,它将事件合并到工作流中的处理器。

您只需编写lambda函数的主体。

xflow支持aws lambda支持的所有语言。此时,lambdas只能从本地文件系统或s3上传。今后,我们将支持与
github集成。




创建示例工作流:
=========

-创建定义工作流的配置。例如:

`` yaml

>常规:
lambda_超时时间:3

区域:
子网ID:
安全组ID:
lambda_执行角色:lambda execute

lambda:
-名称:lambda_reader
描述:读取上载的文件的内容并发布这些内容。
source:/xflow/examples/wordcount/lambda_reader.py
handler:read
runtime:python2.7

-name:lambda_parser
description:读取内容,将其解析为一个单词数组。
source:s3://xflow/flows/wordcount/lambda_parser.py
handler:parse
runtime:python2.7

-name:lambda_filter
描述:从单词和非单词中筛选非字母。
源代码:/xflow/examples/wordcount/lambda_filter.py
处理程序:筛选出非单词
运行时:python2.7

-名称:lambda_aggregator
描述:将相似的单词分组并计数。
源代码:/xflow/examples/wordcount/lambda_aggregator.py
处理程序:aggregate
runtime:python2.7

-name:lambda_summationer
description:打印唯一的字数。
source:/xflow/examples/wordcount/lambda_summationer.py
handler:summary
runtime:python2.7

subscriptions:
-event:fileuploaded
subscribers:
-lambda_reader
-事件:filedownloaded
订阅服务器:
-lambda_parser
-事件:fileparsed
订阅服务器:
-lambda_filter
-事件:filefilter
订阅服务器:
-lambda_aggregator
-事件:fileaggregated
订阅服务器:
-lambda_摘要服务器
-事件:filesummarized
订阅者:

可选-可用于跟踪完整的工作流
工作流:
-id:compute_word_count
流:
-fileuploaded
-filedownloaded
-fileparsed
-filefilter
-fileaggregated
-filesummized
`````

-通过以下co设置工作流mmand:

`xflow word_count.cfg--在后台配置`

-这将执行以下操作:
-创建(或更新)aws lambda函数。
-对于每个lambda函数,将它们订阅到aws基于他们正在监听的事件的动态信息。
-一旦事件发布到aws动态信息系统,将执行lambda函数。

-通过以下命令跟踪工作流:

`xflow word\u count.cfg--track<;workflow\u id>;<;execution\id>;`

vi跟踪特定工作流的实例一个ITS的"执行ID"。因此,工作流中定义的所有
事件都必须包含此字段。由于需要按顺序定义事件,因此可以很容易地确定哪些事件已成功处理,哪些事件在过去7天的任何特定时间都未处理。


tracker的目标是返回给定工作流中所有事件的状态作为输入OnIdID。状态将指示是否处理了该事件。
然后它将获取最后一个未处理的事件,并获取其订阅服务器(lambdas)的所有日志,这些订阅服务器未能处理该事件。

跟踪器将自己订阅
该工作流中的每个流,因此它将接收发布到该流的事件。tracker本身是一个lambda函数,其名称为"tracker"<;workflow\u id>;`。因此,每次成功将事件发布到流时都会调用它。

实际跟踪是通过aws cloudwatchlogs完成的。为每个工作流创建一个cloudwatch"loggroup"。对于工作流执行的每个实例,都会创建一个"logstream"。

因此,当事件发布到流时,跟踪程序将接收该事件并提取
"execution"id。如果尚未创建,它将从此执行ID创建新的日志流。它将
然后将事件存储在该流中。由于lambda的名称是从工作流id生成的,因此也可以方便地提取日志流的日志组。


对于发布到kinisis流的每个后续事件,将调用工作流的相应lambda并将事件保存到日志流。

-在服务器模式下运行:

`xflow word_count.cfg—服务器`

这将通过服务器模式运行xflow。启动时,服务器将设置必要的流、lambda函数和工作流。然后,您可以将事件发布到流或以restful方式跟踪工作流执行。下面是您如何做到这一点的示例。

publishing:


`curl-xpost localhost/publish-d{"stream":"fileuploaded","event":{"ex1","message":"test with ccc"}`

tracking:

`curl-v localhost/track/workflows/compute\word\count/ex1`



installation:
=============


>
>
>``bash
>>;pip-install-xflow
```
>
>
>>
>
>>
>``bash
``````````>>>
>
>
>>
>
>
>
>
>>
>
>
>
>>
>>
>>
;pip-bash
>>>
>>>>
;pip
;pip
>>>>>
;pip-sha(和路线图):集成github(因此我们可以从那里下载lambda函数,并提供链接)
-监视lambda函数
-将日志(或路由日志的能力)集中记录到日志服务器
-ci/cd以获取lambda函数

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java OnResizeListener或OnDrawListener或类似的东西   java Orika映射嵌套子列表   保存时java Heroku请求超时代码H12   数据库在Java中出现socket读取超时异常的原因是什么?   java如何更改来自Sqlite数据库的特定数据在Listview中的行颜色   java JAXB解组器无法正确处理XML中的列表   java Android日期时区让我抓狂   java不透明属性在Swing中如何工作?   eclipse从JavaEE代码生成流程图   java如何在Hibernate中从相关表中获取计数   java Glassfish部署了项目的依赖项库   java使内容适合JavaFx中的WebView   java不满意的链接错误libcrypto。所以1.0.0   循环中java数组的使用   java找出哪个包调用服务