<p>在玩了一会儿之后,我找到了以下解决方案(在我看来不是很优雅,但至少它是有效的):</p>
<pre><code>from dagster import (InputDefinition, OutputDefinition,
execute_pipeline, pipeline, solid, Nothing, repository)
@solid
def pipeline1_task1(context) -> Nothing:
context.log.info('in pipeline 1 task 1')
@solid(input_defs=[InputDefinition("start", Nothing)],
output_defs=[OutputDefinition(str, 'some_str')])
def pipeline1_task2(context) -> str:
context.log.info('in pipeline 1 task 2')
return '\n\n\nmy cool output\n\n\n'
@pipeline
def pipeline1():
pipeline1_task2(pipeline1_task1())
@solid(input_defs=[InputDefinition("print_str", str)])
def pipeline2_task1(context, print_str) -> Nothing:
context.log.info('in pipeline 2 task 1' + print_str)
@solid(input_defs=[InputDefinition("start", Nothing)])
def pipeline2_task2(context) -> Nothing:
context.log.info('in pipeline 2 task 2')
@pipeline
def pipeline2():
pipeline2_task2(pipeline2_task1())
@solid
def run_pipelines(context):
pout = execute_pipeline(pipeline1)
some_str = pout.result_for_solid('pipeline1_task2')
conf = {'solids': {'pipeline2_task1': {'inputs': {'print_str': some_str.output_value('some_str')}}}}
execute_pipeline(pipeline2, run_config=conf)
@pipeline
def pipeline3():
run_pipelines()
@repository
def repo():
return [pipeline1, pipeline2, pipeline3]
if __name__ == '__main__':
execute_pipeline(pipeline3)
</code></pre>
<p>所以。。。在这里,我定义了<code>pipeline3</code>,而不是在底部条件中执行所有操作。管道3只有一个实体,它执行<code>pipeline1</code>并获取实体<code>pipeline1_task2</code>的输出。然后,它创建一个包含该输出的配置<code>some_str</code>,并将该配置传递给第二个管道的<code>execute_pipeline</code></p>
<p>在这里,我们还定义了一个<code>@repository</code>函数,Dagster需要这个函数来确定所有三条管道都是一个整体的一部分</p>
<p>整个事情在<code>dagit</code>中很好地可视化。尽管每个管道与其他管道分别显示,但这三个管道显示在一个存储库中(如代码中定义的)。
<a href="https://i.stack.imgur.com/ux961.png" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/ux961.png" alt="enter image description here"/></a></p>