重新运行时是否缓存全局变量

2024-04-23 18:17:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我的工作如下:

import time

job_id = int(time.time())

airflow_job1 = PythonOperator(op_kwargs={"job_id" : job_id}, ...)
airflow_job2 = BashOperator(op_kwargs={"job_id" : job_id}, ...)

airflow_job1 >> airflow_job2

我知道每次启动脚本时,我都会有一个新的作业id,用于每个任务。但我想知道,如果我从中间运行脚本,比如airflow_job1失败,我修复了问题并从UI中的airflow_job1重新运行,是在重新运行中生成了新的job_id,还是airflow使用了之前的最后一个job_id


Tags: import脚本iduitime作业jobkwargs
1条回答
网友
1楼 · 发布于 2024-04-23 18:17:06

实际上,在我检查了一个简单的案例之后:

# global parameter
job_id = int(time.time())


def airflow_job1(job_id, **context):
    print("in airflow_job1, current timestamp: %s" % job_id)

def airflow_job2(job_id, **context):
    print("in airflow_job2, current timestamp: %s" % job_id)

airflow_job1 = PythonOperator(
    task_id='airflow_job1',
    provide_context=True,
    python_callable=airflow_job1,
    op_kwargs={'job_id': job_id},
    dag=globals()[dag_name]
)

airflow_job2 = PythonOperator(
    task_id='airflow_job2',
    provide_context=True,
    python_callable=airflow_job2,
    op_kwargs={'job_id': job_id},
    dag=globals()[dag_name]
)

 airflow_job1 >> airflow_job2

我发现即使在同一次运行中,airflow_job1和airflow_job2中的作业id也不同

所以结论是,我们不应该以这种方式设置全局参数,也许可以使用xcom\u pull/xcom\u push来解决这个问题

相关问题 更多 >