如何在气流中一次运行相同的dag两次

2024-04-19 01:56:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我对气流是完全陌生的。我有一个要求,即我必须运行两个EMR作业。目前我有一个python脚本,它依赖于一些输入文件,如果存在,它会触发一个EMR作业

我的新要求是,我将需要不同的输入文件(相同类型),这两个文件将被输入到emr作业中,在这两种情况下,spark将做相同的事情,但只有输入文件不同

create_job_workflow = EmrCreateJobFlowOperator(
    task_id='some-task',
    job_flow_overrides=job_flow_args,
    aws_conn_id=aws_conn,
    emr_conn_id=emr_conn,
    dag=dag
)

我可以通过只更改spark submit中的输入文件来运行两个相同的dag来实现这一点吗?基本上,每当我执行“触发dag”时,它将使用两个不同的输入文件并在两个不同的emr集群中触发两个不同的emr作业。或者你能为我提供一些最佳实践吗?或者通过改变最大活动\u运行次数=2,如何实现


Tags: 文件脚本awsidtask作业jobconn
1条回答
网友
1楼 · 发布于 2024-04-19 01:56:37

最佳实践将是为其设置两个不同的任务。通过设置max_active_runs=2,您只需将并发dag_运行的数量限制为2。您可以借助任何数据结构来设置任务的配置、迭代并基于每个属性构建任务

你可以做的另一件事是:

您可以接收文件名作为dag的有效负载 像这样访问它:context['dag_run'].conf.get('filename'))

并使用触发器dag_run操作符重新触发相同的dag,用另一个文件更新所需的有效负载

相关问题 更多 >