我应该如何在管道a完成后启动管道B,并将管道a的输出用于管道B
一段代码作为起点:
from dagster import InputDefinition, Nothing, OutputDefinition, pipeline, solid
@solid
def pipeline1_task1(context) -> Nothing:
context.log.info('in pipeline 1 task 1')
@solid(input_defs=[InputDefinition("start", Nothing)],
output_defs=[OutputDefinition(str, 'some_str')])
def pipeline1_task2(context) -> str:
context.log.info('in pipeline 1 task 2')
return 'my cool output'
@pipeline
def pipeline1():
pipeline1_task2(pipeline1_task1())
@solid(input_defs=[InputDefinition("print_str", str)])
def pipeline2_task1(context, print_str) -> Nothing:
context.log.info('in pipeline 2 task 1' + print_str)
@solid(input_defs=[InputDefinition("start", Nothing)])
def pipeline2_task2(context) -> Nothing:
context.log.info('in pipeline 2 task 2')
@pipeline
def pipeline2():
pipeline2_task2(pipeline2_task1())
if __name__ == '__main__':
# run pipeline 1
# store outputs
# call pipeline 2 using the above outputs
这里我们有三个管道:pipeline1
有两个实体,可能执行我们希望执行的任何操作,并返回第二个实体的输出pipeline2
应该使用pipeline1_task2
的输出,最终执行另一项工作并打印第一条管道的输出
我应该如何“连接”这两条管道
使一条管道在另一条管道之后执行的一种方法是通过传感器。在Dagster中执行此操作的推荐方法是使用“资产传感器”。第一条管道中的固体产生
AssetMaterialization
,第二条管道中的传感器等待该资产被物化下面是一个例子:https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#asset-sensors
在玩了一会儿之后,我找到了以下解决方案(在我看来不是很优雅,但至少它是有效的):
所以。。。在这里,我定义了
pipeline3
,而不是在底部条件中执行所有操作。管道3只有一个实体,它执行pipeline1
并获取实体pipeline1_task2
的输出。然后,它创建一个包含该输出的配置some_str
,并将该配置传递给第二个管道的execute_pipeline
在这里,我们还定义了一个
@repository
函数,Dagster需要这个函数来确定所有三条管道都是一个整体的一部分整个事情在
dagit
中很好地可视化。尽管每个管道与其他管道分别显示,但这三个管道显示在一个存储库中(如代码中定义的)。相关问题 更多 >
编程相关推荐