以dag为单位增加或减少秒数

2024-05-23 23:36:46 发布

您现在位置:Python中文网/ 问答频道 /正文

有没有办法在dag中增加或减少{ts_nodash}秒数? 由于{ts_nodash}获得的日期时间相同,我的输出被覆盖。有没有办法用1-2秒增加{{ts_nodash}} 就像我们可以在{{ds}}里做几天那样-

airflow.macros.ds_add(ds, days) 

ds_add('2015-01-01', 5)

Tags: add时间dsdaysmacrosairflowdagts
2条回答

有两种方法可以做到这一点

  1. 创建自己的宏,如ds_add,并将其放在plugin
  2. 在Jinja模板中使用python代码

选项2是最简单、最直接的方法

from datetime import datetime

from airflow.models import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="trigger_dag",
    start_date=datetime(2021, 3, 10),
    catchup=True,
    schedule_interval='@once',
)

with dag:
    op = PythonOperator(
        task_id='a',
        python_callable=lambda x, y: print(x, y),
        op_args=[
            '{{ ts_nodash }}',
            '{{ execution_date.subtract(seconds=2).strftime("%Y%m%dT%H%M%S")  }}',
        ],
    )

获取ts_nodash的方式如how the context is built所示。我们基本上是在datetime对象的基础上构建的,该对象将被格式化为字符串

以下是任务的呈现模板视图

enter image description here

尝试使用钟摆日期宏,而不是日期字符串。可以操纵日期并将其返回为字符串

import pendulum
....
execution_date.subtract(seconds=2).to_datetime_string() 

相关问题 更多 >