aws::step函数的cdk构造库

aws-cdk.aws-stepfunctions的Python项目详细描述


aws步骤函数构造库


稳定性:实验性

< Buff行情>

这是一个开发者预览(public beta)模块。发行版可能缺少重要功能,并且可能具有 未来的突破性变化。

此api仍在积极开发中,并处于非落后状态 在任何未来版本中的兼容更改或删除。不建议在生产中使用API 环境。实验性api不受语义版本控制模型的约束。


aws cdk/aws stepfunctions包包含用于构建 使用对象的无服务器工作流。将此与 @aws cdk/aws stepfunctions tasks包,其中包含使用的类 呼叫其他AWS服务。

定义工作流如下所示(对于步骤函数job poller 示例):

typescript示例

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});

状态机

astepFunctions.state machine是接受状态机的资源 定义。定义由其开始状态指定,并包含 从开始状态可到达的所有状态:

conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});

状态机使用iam角色执行,该角色将自动拥有 添加了使所有状态机任务执行所需的权限 正确(例如,调用添加到的任何lambda函数的权限 你的工作流程)。默认情况下将创建一个角色,但您可以提供 也有一个。

亚马逊国家语言

这个库附带了一组类,对amazon状态进行建模。 语言。下列状态类 支持:

  • 任务
  • 通过
  • 等待
  • 选择
  • 并行
  • 成功
  • 失败

任意json对象(在执行开始时指定)从状态传递到 工作流执行期间的状态和转换。为了更多 有关信息,请参见州语言规范。

任务

a任务表示一些需要完成的工作。确切的工作是 done是由实现集合istepfunctionstask的类确定的 其中可以在@aws cdk/aws stepfunctions tasks包中找到。一 可用的任务有:

  • tasks.invokeActivity--启动活动(活动表示工作 在您自己管理的计算车队上进行投票的队列)
  • tasks.invokeFunction--使用函数arn调用lambda函数
  • tasks.runlambdask--使用magic arn将lambda作为集成服务调用
  • tasks.publishToTopic--将消息发布到SNS主题
  • tasks.send to queue--向SQS队列发送消息
  • tasks.runecsfargatetask/ecs.runecsec2task--运行容器任务, 取决于容量类型。
  • tasks.sagemakertraintask--运行sagemaker培训作业
  • tasks.sagemakertransformtask--运行sagemaker转换作业
  • tasks.startExecution--将startExecution调用到步骤函数的状态机

除了tasks.invokeactivity和tasks.invokefunction,服务集成 模式 (整数ationPattern)应该在客户需要时作为参数提供 在任务状态中调用集成服务。默认值为fire\u and\u forget

来自状态json的任务参数

许多任务都有参数。这些值可以提供 直接在工作流定义中(通过指定其值),或在 通过传递从数据的静态函数获取的值来运行时, 例如data.stringat()

如果是,则从状态json中指定的位置获取值, 类似于(例如)inputpath

lambda示例-调用函数

consttask=newsfn.Task(this,'Invoke1',{task: newtasks.InvokeFunction(myLambda),inputPath:'$.input',timeout: Duration.minutes(5),});// Add a retry policytask.addRetry({interval: Duration.seconds(5),maxAttempts: 10});// Add an error handlertask.addCatch(errorHandlerState);// Set the next statetask.next(nextState);

lambda示例-runlambdask

consttask=newsfn.Task(stack,'Invoke2',{task: newtasks.RunLambdaTask(myLambda,{integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,payload:{token: sfn.Context.taskToken}})});

sns示例
importsns=require('@aws-cdk/aws-sns');// ...consttopic=newsns.Topic(this,'Topic');// Use a field from the execution data as message.consttask1=newsfn.Task(this,'Publish1',{task: newtasks.PublishToTopic(topic,{integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,message: TaskInput.fromDataAt('$.state.message'),})});// Combine a field from the execution data with// a literal object.consttask2=newsfn.Task(this,'Publish2',{task: newtasks.PublishToTopic(topic,{message: TaskInput.fromObject({field1:'somedata',field2: Data.stringAt('$.field2'),})})});

SQS示例

importsqs=require('@aws-cdk/aws-sqs');// ...constqueue=newsns.Queue(this,'Queue');// Use a field from the execution data as message.consttask1=newsfn.Task(this,'Send1',{task: newtasks.SendToQueue(queue,{messageBody: TaskInput.fromDataAt('$.message'),// Only for FIFO queuesmessageGroupId:'1234'})});// Combine a field from the execution data with// a literal object.consttask2=newsfn.Task(this,'Send2',{task: newtasks.SendToQueue(queue,{messageBody: TaskInput.fromObject({field1:'somedata',field2: Data.stringAt('$.field2'),}),// Only for FIFO queuesmessageGroupId:'1234'})});

ECS示例

importecs=require('@aws-cdk/aws-ecs');// See examples in ECS library for initialization of 'cluster' and 'taskDefinition'constfargateTask=newecs.RunEcsFargateTask({cluster,taskDefinition,containerOverrides:[{containerName:'TheContainer',environment:[{name:'CONTAINER_INPUT',value: Data.stringAt('$.valueFromStateData')}]}]});fargateTask.connections.allowToDefaultPort(rdsCluster,'Read the database');consttask=newsfn.Task(this,'CallFargate',{task: fargateTask});

sagemaker转换示例
consttransformJob=newtasks.SagemakerTransformTask(transformJobName:"MyTransformJob",modelName:"MyModelName",role,transformInput:{transformDataSource:{s3DataSource:{s3Uri:'s3://inputbucket/train',s3DataType: S3DataType.S3Prefix,}}},transformOutput:{s3OutputPath:'s3://outputbucket/TransformJobOutputPath',},transformResources:{instanceCount: 1,instanceType: ec2.InstanceType.of(ec2.InstanceClass.M4,ec2.InstanceSize.XLarge),});consttask=newsfn.Task(this,'Batch Inference',{task: transformJob});

步骤函数示例
// Define a state machine with one Pass stateconstchild=newsfn.StateMachine(stack,'ChildStateMachine',{definition: sfn.Chain.start(newsfn.Pass(stack,'PassState')),});// Include the state machine in a Task state with callback patternconsttask=newsfn.Task(stack,'ChildTask',{task: newtasks.ExecuteStateMachine(child,{integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,input:{token: sfn.Context.taskToken,foo:'bar'},name:'MyExecutionName'})});// Define a second state machine with the Task state abovenewsfn.StateMachine(stack,'ParentStateMachine',{definition: task});

通过

apass状态不起作用,但它可以选择转换执行的 json状态。

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
0

等待

await状态等待给定的秒数,或直到当前时间 在特定的时间。等待时间可能取自执行的json 状态:

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
1

选择

a选项状态可以根据 执行的json状态中的值:

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
2

如果要基于条件临时分支工作流,但 所有分支聚在一起并作为一个分支继续(类似于如果…然后…否则在编程语言中有效),请使用。after()方法:

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
3

如果您的选项没有否则()并且没有匹配的条件 json状态将抛出一个nochoicematched错误。包装状态机 如果要捕获并从中恢复,则处于并行状态。

平行

并行状态并行执行一个或多个子工作流。它也可以 用于捕获子工作流中的错误并从中恢复。

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
4

成功

到达succeed状态将使用 成功状态。

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
5

失败

到达fail状态将使用 故障状态。失败状态应报告失败的原因。 故障可以通过包含并行状态来捕获。

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
6

任务链

使定义工作流程更加方便(并以自上而下的方式可读) 在编写常规程序时,可以链接大多数方法调用。 特别是,可以重复.next()方法。一系列的结果 .next()调用称为链,可在定义跳转时使用 选择的目标。on并行。分支

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
7

如果你不喜欢从第一个链子直接开始的视觉效果 步骤,您可以使用chain.start

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
8

状态机片段

可以通过 定义一个实现ichainable的构造,它要求您定义 两个字段:

  • startstate:state,表示进入此状态机的入口点。
  • 结束状态:不可精确[],表示(一个或多个)传出的状态 如果链接到片段上,将添加到的转换。

由于状态将以其构造ID命名,您可能需要在 如果计划实例化同一状态机片段,则状态的ID 多次(否则每个实例化中的所有状态都将相同 姓名)

类statemachinefragment包含一些帮助函数(例如 prefixStates())使您更容易执行此操作。如果你知道你的国家 machine作为这个的一个子类,使用起来很方便:

importsfn=require('@aws-cdk/aws-stepfunctions');importtasks=require('@aws-cdk/aws-stepfunctions-tasks');constsubmitLambda=newlambda.Function(this,'SubmitLambda',{...});constgetStatusLambda=newlambda.Function(this,'CheckLambda',{...});constsubmitJob=newsfn.Task(this,'Submit Job',{task: newtasks.InvokeFunction(submitLambda),// Put Lambda's result here in the execution's state objectresultPath:'$.guid',});constwaitX=newsfn.Wait(this,'Wait X Seconds',{duration: sfn.WaitDuration.secondsPath('$.wait_time'),});constgetStatus=newsfn.Task(this,'Get Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Pass just the field named "guid" into the Lambda, put the// Lambda's result in a field called "status"inputPath:'$.guid',resultPath:'$.status',});constjobFailed=newsfn.Fail(this,'Job Failed',{cause:'AWS Batch Job Failed',error:'DescribeJob returned FAILED',});constfinalStatus=newsfn.Task(this,'Get Final Job Status',{task: newtasks.InvokeFunction(getStatusLambda),// Use "guid" field as input, output of the Lambda becomes the// entire state machine output.inputPath:'$.guid',});constdefinition=submitJob.next(waitX).next(getStatus).next(newsfn.Choice(this,'Job Complete?')// Look at the "status" field.when(sfn.Condition.stringEquals('$.status','FAILED'),jobFailed).when(sfn.Condition.stringEquals('$.status','SUCCEEDED'),finalStatus).otherwise(waitX));newsfn.StateMachine(this,'StateMachine',{definition,timeout: Duration.minutes(5)});
9

活动

活动表示在某些非lambda工作池上完成的工作。这个 步骤功能工作流将工作提交到此活动和工作池 你自己运行,可能在ec2上运行,将从活动中提取作业,并 将单个作业的结果提交回。

您需要ARN来执行此操作,因此如果您使用活动,请确保通过该活动 进入您的工作池:

conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});
0

指标

任务对象公开执行该特定任务的各种度量。例如, 在特定任务失败时创建警报:

conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});
1

完整的状态机上也有度量:

conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});
2

您的帐户中所有状态机的容量都有度量:

conststartState=newstepfunctions.Pass(this,'StartState');newstepfunctions.StateMachine(this,'StateMachine',{definition: startState});
3

未来工作

欢迎投稿:

  • []一个lambda task类,它同时是一个lambda和一个任务 可能是一个不错的api。
  • []条件表达式分析器。
  • []在单元测试中模拟状态机。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
linux Java线程创建跟踪   java可以让UNMIDEANER考虑文档过滤吗?   获取元素的java数组   java在ArrayList中搜索字符串并返回另一个关联字符串   java将现有私钥导入BKS密钥库   java proguard死代码分析给出了私有字段的假阳性   Java Web框架,用于新项目   java Gson:指定类或字段的命名策略   递归Java:保存递归本地计数器的值   java jfree图表集在图表中显示范围x值   Java:可以比较一个类吗对象是否具有泛型类型参数?   java JMS单个会话可以有多个MessageProducer吗?   java如何比较100条字符串记录,这些记录应该有3个值,即true、False或null   打开GUI实例的java检查   java如何重构具有多个切换情况的应用程序?   java如何更新Ubuntu上已经安装的IntelliJ IDEA?   java如何避免更改存档中文件的文件属性?