我的工作如下:
import time
job_id = int(time.time())
airflow_job1 = PythonOperator(op_kwargs={"job_id" : job_id}, ...)
airflow_job2 = BashOperator(op_kwargs={"job_id" : job_id}, ...)
airflow_job1 >> airflow_job2
我知道每次启动脚本时,我都会有一个新的作业id,用于每个任务。但我想知道,如果我从中间运行脚本,比如airflow_job1失败,我修复了问题并从UI中的airflow_job1重新运行,是在重新运行中生成了新的job_id,还是airflow使用了之前的最后一个job_id
实际上,在我检查了一个简单的案例之后:
我发现即使在同一次运行中,airflow_job1和airflow_job2中的作业id也不同
所以结论是,我们不应该以这种方式设置全局参数,也许可以使用xcom\u pull/xcom\u push来解决这个问题
相关问题 更多 >
编程相关推荐