aws::step函数的cdk构造库
aws-cdk.aws-stepfunctions的Python项目详细描述
aws步骤函数构造库
< Buff行情>
这是一个开发者预览(public beta)模块。发行版可能缺少重要功能,并且可能具有 未来的突破性变化。
此api仍在积极开发中,并处于非落后状态 在任何未来版本中的兼容更改或删除。不建议在生产中使用API 环境。实验性api不受语义版本控制模型的约束。
aws cdk/aws stepfunctions包包含用于构建
使用对象的无服务器工作流。将此与
@aws cdk/aws stepfunctions tasks
包,其中包含使用的类
呼叫其他AWS服务。
定义工作流如下所示(对于步骤函数job poller 示例):
typescript示例
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
状态机
astepFunctions.state machine
是接受状态机的资源
定义。定义由其开始状态指定,并包含
从开始状态可到达的所有状态:
conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});
状态机使用iam角色执行,该角色将自动拥有 添加了使所有状态机任务执行所需的权限 正确(例如,调用添加到的任何lambda函数的权限 你的工作流程)。默认情况下将创建一个角色,但您可以提供 也有一个。
亚马逊国家语言
这个库附带了一组类,对amazon状态进行建模。 语言。下列状态类 支持:
任务
通过
等待
选择
并行
成功
失败
任意json对象(在执行开始时指定)从状态传递到 工作流执行期间的状态和转换。为了更多 有关信息,请参见州语言规范。
任务
a任务
表示一些需要完成的工作。确切的工作是
done是由实现集合istepfunctionstask的类确定的
其中可以在@aws cdk/aws stepfunctions tasks
包中找到。一
可用的任务有:
tasks.invokeActivity
--启动活动(活动表示工作 在您自己管理的计算车队上进行投票的队列)tasks.invokeFunction
--使用函数arn调用lambda函数tasks.runlambdask
--使用magic arn将lambda作为集成服务调用tasks.publishToTopic
--将消息发布到SNS主题tasks.send to queue
--向SQS队列发送消息tasks.runecsfargatetask
/ecs.runecsec2task
--运行容器任务, 取决于容量类型。tasks.sagemakertraintask
--运行sagemaker培训作业tasks.sagemakertransformtask
--运行sagemaker转换作业tasks.startExecution
--将startExecution调用到步骤函数的状态机
除了tasks.invokeactivity和tasks.invokefunction,服务集成
模式
(整数ationPattern
)应该在客户需要时作为参数提供
在任务状态中调用集成服务。默认值为fire\u and\u forget
来自状态json的任务参数
许多任务都有参数。这些值可以提供
直接在工作流定义中(通过指定其值),或在
通过传递从数据的静态函数获取的值来运行时,
例如
data.stringat()
如果是,则从状态json中指定的位置获取值,
类似于(例如)inputpath
lambda示例-调用函数
consttask=newsfn.Task(this,'Invoke1',{task: newtasks.InvokeFunction(myLambda),inputPath:'$.input',timeout: Duration.minutes(5),});// Add a retry policytask.addRetry({interval: Duration.seconds(5),maxAttempts: 10});// Add an error handlertask.addCatch(errorHandlerState);// Set the next statetask.next(nextState);
lambda示例-runlambdask
consttask=newsfn.Task(stack,'Invoke2',{task: newtasks.RunLambdaTask(myLambda,{integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,payload:{token: sfn.Context.taskToken}})});
sns示例
importsns=require('@aws-cdk/aws-sns');// ...consttopic=newsns.Topic(this,'Topic');// Use a field from the execution data as message.consttask1=newsfn.Task(this,'Publish1',{task: newtasks.PublishToTopic(topic,{integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,message: TaskInput.fromDataAt('$.state.message'),})});// Combine a field from the execution data with// a literal object.consttask2=newsfn.Task(this,'Publish2',{task: newtasks.PublishToTopic(topic,{message: TaskInput.fromObject({field1:'somedata',field2: Data.stringAt('$.field2'),})})});
SQS示例
importsqs=require('@aws-cdk/aws-sqs');// ...constqueue=newsns.Queue(this,'Queue');// Use a field from the execution data as message.consttask1=newsfn.Task(this,'Send1',{task: newtasks.SendToQueue(queue,{messageBody: TaskInput.fromDataAt('$.message'),// Only for FIFO queuesmessageGroupId:'1234'})});// Combine a field from the execution data with// a literal object.consttask2=newsfn.Task(this,'Send2',{task: newtasks.SendToQueue(queue,{messageBody: TaskInput.fromObject({field1:'somedata',field2: Data.stringAt('$.field2'),}),// Only for FIFO queuesmessageGroupId:'1234'})});
ECS示例
importecs=require('@aws-cdk/aws-ecs');// See examples in ECS library for initialization of 'cluster' and 'taskDefinition'constfargateTask=newecs.RunEcsFargateTask({cluster,taskDefinition,containerOverrides:[{containerName:'TheContainer',environment:[{name:'CONTAINER_INPUT',value: Data.stringAt('$.valueFromStateData')}]}]});fargateTask.connections.allowToDefaultPort(rdsCluster,'Read the database');consttask=newsfn.Task(this,'CallFargate',{task: fargateTask});
sagemaker转换示例
consttransformJob=newtasks.SagemakerTransformTask(transformJobName:"MyTransformJob",modelName:"MyModelName",role,transformInput:{transformDataSource:{s3DataSource:{s3Uri:'s3://inputbucket/train',s3DataType: S3DataType.S3Prefix,}}},transformOutput:{s3OutputPath:'s3://outputbucket/TransformJobOutputPath',},transformResources:{instanceCount: 1,instanceType: ec2.InstanceType.of(ec2.InstanceClass.M4,ec2.InstanceSize.XLarge),});consttask=newsfn.Task(this,'Batch Inference',{task: transformJob});
步骤函数示例
// Define a state machine with one Pass stateconstchild=newsfn.StateMachine(stack,'ChildStateMachine',{definition: sfn.Chain.start(newsfn.Pass(stack,'PassState')),});// Include the state machine in a Task state with callback patternconsttask=newsfn.Task(stack,'ChildTask',{task: newtasks.ExecuteStateMachine(child,{integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,input:{token: sfn.Context.taskToken,foo:'bar'},name:'MyExecutionName'})});// Define a second state machine with the Task state abovenewsfn.StateMachine(stack,'ParentStateMachine',{definition: task});
通过
consttransformJob=newtasks.SagemakerTransformTask(transformJobName:"MyTransformJob",modelName:"MyModelName",role,transformInput:{transformDataSource:{s3DataSource:{s3Uri:'s3://inputbucket/train',s3DataType: S3DataType.S3Prefix,}}},transformOutput:{s3OutputPath:'s3://outputbucket/TransformJobOutputPath',},transformResources:{instanceCount: 1,instanceType: ec2.InstanceType.of(ec2.InstanceClass.M4,ec2.InstanceSize.XLarge),});consttask=newsfn.Task(this,'Batch Inference',{task: transformJob});
步骤函数示例
// Define a state machine with one Pass stateconstchild=newsfn.StateMachine(stack,'ChildStateMachine',{definition: sfn.Chain.start(newsfn.Pass(stack,'PassState')),});// Include the state machine in a Task state with callback patternconsttask=newsfn.Task(stack,'ChildTask',{task: newtasks.ExecuteStateMachine(child,{integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,input:{token: sfn.Context.taskToken,foo:'bar'},name:'MyExecutionName'})});// Define a second state machine with the Task state abovenewsfn.StateMachine(stack,'ParentStateMachine',{definition: task});
通过
apass
状态不起作用,但它可以选择转换执行的
json状态。
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});0
等待
await
状态等待给定的秒数,或直到当前时间
在特定的时间。等待时间可能取自执行的json
状态:
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});1
选择
a选项
状态可以根据
执行的json状态中的值:
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});2
如果要基于条件临时分支工作流,但
所有分支聚在一起并作为一个分支继续(类似于如果…然后…否则在编程语言中有效),请使用。after()
方法:
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});3
如果您的选项没有
否则()
并且没有匹配的条件
json状态将抛出一个nochoicematched
错误。包装状态机
如果要捕获并从中恢复,则处于并行状态。
平行
并行状态并行执行一个或多个子工作流。它也可以 用于捕获子工作流中的错误并从中恢复。
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});4
成功
到达succeed
状态将使用
成功状态。
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});5
失败
到达fail
状态将使用
故障状态。失败状态应报告失败的原因。
故障可以通过包含并行状态来捕获。
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});6
任务链
使定义工作流程更加方便(并以自上而下的方式可读)
在编写常规程序时,可以链接大多数方法调用。
特别是,可以重复.next()
方法。一系列的结果
.next()
调用称为链,可在定义跳转时使用
选择的目标。on
或并行。分支
:
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});7
如果你不喜欢从第一个链子直接开始的视觉效果 步骤,您可以使用chain.start
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});8
状态机片段
可以通过
定义一个实现ichainable
的构造,它要求您定义
两个字段:
startstate:state
,表示进入此状态机的入口点。结束状态:不可精确[]
,表示(一个或多个)传出的状态 如果链接到片段上,将添加到的转换。
由于状态将以其构造ID命名,您可能需要在 如果计划实例化同一状态机片段,则状态的ID 多次(否则每个实例化中的所有状态都将相同 姓名)
类statemachinefragment包含一些帮助函数(例如
prefixStates()
)使您更容易执行此操作。如果你知道你的国家
machine作为这个的一个子类,使用起来很方便:
importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});9
活动
活动表示在某些非lambda工作池上完成的工作。这个 步骤功能工作流将工作提交到此活动和工作池 你自己运行,可能在ec2上运行,将从活动中提取作业,并 将单个作业的结果提交回。
您需要ARN来执行此操作,因此如果您使用活动,请确保通过该活动 进入您的工作池:
conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});0
指标
任务
对象公开执行该特定任务的各种度量。例如,
在特定任务失败时创建警报:
conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});1
完整的状态机上也有度量:
conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});2
您的帐户中所有状态机的容量都有度量:
conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});3
未来工作
欢迎投稿:
- []一个
lambda task
类,它同时是一个lambda
和一个任务 可能是一个不错的api。
- []条件表达式分析器。
- []在单元测试中模拟状态机。