我们一直在寻找访问dag运行配置JSON的方法,并根据实际情况动态构建实际的dag和参考底图任务
由于Jinja模板对我的使用有一定的限制,我选择使用“香草”python,使用函数来构建我的任务
这一切的核心是能够访问配置JSON,我在这里找到了如何使用它:https://stackoverflow.com/a/68455786/5687904
然而,由于我使用的是Airflow 1.10.12(Composer 1.13.3),我不得不使用旧的/不推荐的属性来编辑上面的内容,因此我得到的是:
conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
我在一个新的DAG中使用它进行测试,这里是一个剥离任何私有数据的最小工作示例:
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
from dependencies.airflow_utils import (
DBT_IMAGE
)
from dependencies.kube_secrets import (
GIT_DATA_TESTS_PRIVATE_KEY
)
# Default arguments for the DAG
default_args = {
"depends_on_past": False,
"owner": "airflow",
"retries": 0,
"start_date": datetime(2021, 5, 7, 0, 0, 0),
'dataflow_default_options': {
'project': 'my-gcp_project',
'region': 'europe-west1'
}
}
# Create the DAG
dag = DAG("test_conf_strings2", default_args=default_args, schedule_interval=None)
# DBT task creation function
conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
def dynamic_full_refresh_strings(conf, arguments):
if conf.get("full-refresh") and 'dbt snapshot' in arguments:
return ' --vars "full-refresh: true"'
elif conf.get("full-refresh"):
return conf.get("full-refresh")
else:
return ""
def task_dbt_run(conf, name, arguments, **kwargs):
return KubernetesPodOperator(
image=DBT_IMAGE,
task_id="dbt_run_{}".format(name),
name="dbt_run_{}".format(name),
secrets=[
GIT_DATA_TESTS_PRIVATE_KEY,
],
startup_timeout_seconds=540,
arguments=[arguments + dynamic_full_refresh_strings(conf, arguments)],
dag=dag,
get_logs=True,
image_pull_policy="Always",
resources={"request_memory": "512Mi", "request_cpu": "250m"},
retries=3,
namespace="default",
cmds=["/bin/bash", "-c"]
)
# DBT commands
dbt_bqtoscore = f"""
{clone_repo_simplified_cmd} &&
cd bigqueryprocessing/data &&
dbt run --profiles-dir .dbt --models execution_engine_filter"""
# Create all tasks for the dag
dbt_run_bqtoscore = task_dbt_run(conf, "bqtoscore", dbt_bqtoscore)
# Task dependencies setting
dbt_run_bqtoscore
然而,当我尝试将此逻辑添加到我的主DAG时,我开始得到'NoneType' object has no attribute 'get'
在像疯子一样检查了所有东西并做了大量的diffchecker之后,我确认没有任何区别
为了确保我不会完全发疯,我甚至复制了我的工作测试DAG,并将其名称改为其他名称,这样就不会与原始名称冲突。 我又犯了错误,基本上是dag的1:1副本
因此,这里发生的情况是,根据错误判断,conf = dag.get_dagrun(execution_date=dag.latest_execution_date).conf
的相同代码在dag中产生不同的结果,其唯一区别是dag名称。
在我的工作测试中,我得到了正确的JSON,如果没有传递,那么就没有错误。
但在出错的情况下,是“无”导致了问题
有人知道这里会发生什么吗? 或者至少是关于我应该做哪些测试/调试来深入挖掘的想法
目前没有回答
相关问题 更多 >
编程相关推荐