为什么在Airflow/Google Composer中尝试使用DAG.get_dagrun()时会出现暂时性错误?

2024-05-29 03:07:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我们一直在寻找访问dag运行配置JSON的方法,并根据实际情况动态构建实际的dag和参考底图任务

由于Jinja模板对我的使用有一定的限制,我选择使用“香草”python,使用函数来构建我的任务

这一切的核心是能够访问配置JSON,我在这里找到了如何使用它:https://stackoverflow.com/a/68455786/5687904

然而,由于我使用的是Airflow 1.10.12(Composer 1.13.3),我不得不使用旧的/不推荐的属性来编辑上面的内容,因此我得到的是:

conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf

我在一个新的DAG中使用它进行测试,这里是一个剥离任何私有数据的最小工作示例:

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
from dependencies.airflow_utils import (
    DBT_IMAGE
)
from dependencies.kube_secrets import (
    GIT_DATA_TESTS_PRIVATE_KEY
)
# Default arguments for the DAG
default_args = {
    "depends_on_past": False,
    "owner": "airflow",
    "retries": 0,
    "start_date": datetime(2021, 5, 7, 0, 0, 0),
    'dataflow_default_options': {
        'project': 'my-gcp_project',
        'region': 'europe-west1'
        }
}

# Create the DAG
dag = DAG("test_conf_strings2", default_args=default_args, schedule_interval=None)
# DBT task creation function
conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
def dynamic_full_refresh_strings(conf, arguments):  
    if conf.get("full-refresh") and 'dbt snapshot' in arguments:
        return ' --vars "full-refresh: true"'
    elif conf.get("full-refresh"):
        return conf.get("full-refresh")
    else:
        return ""

def task_dbt_run(conf, name, arguments, **kwargs):
    return KubernetesPodOperator(
    image=DBT_IMAGE,
    task_id="dbt_run_{}".format(name),
    name="dbt_run_{}".format(name),
    secrets=[
        GIT_DATA_TESTS_PRIVATE_KEY,
    ],
    startup_timeout_seconds=540,
    arguments=[arguments + dynamic_full_refresh_strings(conf, arguments)],
    dag=dag,
    get_logs=True,
    image_pull_policy="Always",
    resources={"request_memory": "512Mi", "request_cpu": "250m"},
    retries=3,
    namespace="default",
    cmds=["/bin/bash", "-c"]
)

# DBT commands
dbt_bqtoscore = f"""
    {clone_repo_simplified_cmd} &&
    cd bigqueryprocessing/data &&
    dbt run --profiles-dir .dbt --models execution_engine_filter"""

# Create all tasks for the dag
dbt_run_bqtoscore = task_dbt_run(conf, "bqtoscore", dbt_bqtoscore)

# Task dependencies setting
dbt_run_bqtoscore

然而,当我尝试将此逻辑添加到我的主DAG时,我开始得到'NoneType' object has no attribute 'get'

在像疯子一样检查了所有东西并做了大量的diffchecker之后,我确认没有任何区别

为了确保我不会完全发疯,我甚至复制了我的工作测试DAG,并将其名称改为其他名称,这样就不会与原始名称冲突。 我又犯了错误,基本上是dag的1:1副本

因此,这里发生的情况是,根据错误判断,conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf的相同代码在dag中产生不同的结果,其唯一区别是dag名称。 在我的工作测试中,我得到了正确的JSON,如果没有传递,那么就没有错误。 但在出错的情况下,是“无”导致了问题

有人知道这里会发生什么吗? 或者至少是关于我应该做哪些测试/调试来深入挖掘的想法


Tags: runfromimportdefaultgetdateconfarguments

热门问题