在气流中,我面临的问题是我需要将job_flow_id
传递到我的emr步骤之一。我能够从操作符中检索job_flow_id
,但是当我要创建提交到集群的步骤时,task_instance
值不正确。
我有以下代码:
def issue_step(name, args):
return [
{
"Name": name,
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "s3://....",
"Args": args
}
}
]
dag = DAG('example',
description='My dag',
schedule_interval='0 8 * * 6',
dagrun_timeout=timedelta(days=2))
try:
create_emr = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
dag=dag
)
load_data_steps = issue_step('load', ['arg1', 'arg2'])
load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
load_data_steps[0]["HadoopJarStep"]["Args"].append(
"{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id
load_data = EmrAddStepsOperator(
task_id='load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others
aws_conn_id='aws_default',
steps=load_data_steps,
dag=dag
)
check_load_data = EmrStepSensor(
task_id='watch_load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
create_emr_recommendations >> load_data
load_data >> check_load_data
check_load_data >> cluster_remover
except AirflowException as ae:
print ae.message
问题是,当我检查EMR时,我没有在load_data
步骤中看到{
如何在step函数中获得实际值?在
谢谢,节日快乐
我发现气流存储库中有关于this的PR。问题是
EmrAddStepsOperator
中的步骤没有模板。为了克服这个问题,我做了以下工作:EmrAddStepsOperator
继承的自定义运算符这里是自定义运算符的代码和文件
custom_emr_add_step_operator.py
中的插件(见下面的树)在我的DAG文件中,我是这样调用插件的
^{pr2}$我的项目和插件的结构如下所示:
如果您使用的是一个IDE,比如PyCharm,它会抱怨,因为它说找不到模块。但当你运行气流时,这个问题就不会出现了。 还要记住,在你的
airflow.cfg
中,你要指向右边的plugins
文件夹,这样气流就可以读取你新创建的插件了。在相关问题 更多 >
编程相关推荐