EMR-op中的气流任务实例

2024-05-23 15:19:48 发布

您现在位置:Python中文网/ 问答频道 /正文

在气流中,我面临的问题是我需要将job_flow_id传递到我的emr步骤之一。我能够从操作符中检索job_flow_id,但是当我要创建提交到集群的步骤时,task_instance值不正确。 我有以下代码:

def issue_step(name, args):
    return [
        {
            "Name": name,
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
                "Jar": "s3://....",
                "Args": args
            }
        }
    ]

dag = DAG('example',
          description='My dag',
          schedule_interval='0 8 * * 6',
          dagrun_timeout=timedelta(days=2))

try:

    create_emr = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        aws_conn_id='aws_default',        
        dag=dag
    )

    load_data_steps = issue_step('load', ['arg1', 'arg2'])

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
        "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id

    load_data = EmrAddStepsOperator(
        task_id='load_data',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",  # this is correctly exchanged with the job_flow_id - same for the others
        aws_conn_id='aws_default',
        steps=load_data_steps,
        dag=dag
    )

    check_load_data = EmrStepSensor(
        task_id='watch_load_data',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
        aws_conn_id='aws_default',
        dag=dag
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        dag=dag
    )

    create_emr_recommendations >> load_data
    load_data >> check_load_data
    check_load_data >> cluster_remover

except AirflowException as ae:
    print ae.message

问题是,当我检查EMR时,我没有在load_data步骤中看到{},而是看到了{},这导致我的步骤失败。在

如何在step函数中获得实际值?在

谢谢,节日快乐


Tags: instanceawsidtaskdatareturnvaluecreate
1条回答
网友
1楼 · 发布于 2024-05-23 15:19:48

我发现气流存储库中有关于this的PR。问题是EmrAddStepsOperator中的步骤没有模板。为了克服这个问题,我做了以下工作:

  • 创建了从EmrAddStepsOperator继承的自定义运算符
  • 已将此运算符添加为插件
  • 在我的DAG文件里打电话给新来的接线员

这里是自定义运算符的代码和文件custom_emr_add_step_operator.py中的插件(见下面的树)

from __future__ import division, absolute_import, print_function

from airflow.plugins_manager import AirflowPlugin
from airflow.utils import apply_defaults

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator


class CustomEmrAddStepsOperator(EmrAddStepsOperator):
    template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above

    @apply_defaults
    def __init__(
            self,
            *args, **kwargs):
        super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        super(CustomEmrAddStepsOperator, self).execute(context=context)


# Defining the plugin class
class CustomPlugin(AirflowPlugin):
    name = "custom_plugin"
    operators = [CustomEmrAddStepsOperator]

在我的DAG文件中,我是这样调用插件的

^{pr2}$

我的项目和插件的结构如下所示:

├── config
│   └── airflow.cfg
├── dags
│   ├── __init__.py
│   └── my_dag.py
├── plugins
│   ├── __init__.py
│   └── operators
│       ├── __init__.py
│       └── custom_emr_add_step_operator.py
└── requirements.txt

如果您使用的是一个IDE,比如PyCharm,它会抱怨,因为它说找不到模块。但当你运行气流时,这个问题就不会出现了。 还要记住,在你的airflow.cfg中,你要指向右边的plugins文件夹,这样气流就可以读取你新创建的插件了。在

相关问题 更多 >