我正在将数据从Postgresql导出到GCS,然后将GCS导出到BQ。DAG看起来不错。但在导出路径中,Im使用当前时间戳,如下所示
import datetime
date=str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M"))
year=date.split('-')[0]
month=date.split('-')[1]
day=date.split('-')[2]
hour=date.split('-')[3]
minutes=date.split('-')[4]
export_suffix = year + '/' + month + '/' + day + '/' + hour + '/' + minutes
因此TASK1(导出到GCS)将数据导出到此路径中
gs://bucket/jobs/export_tbl/2020/09/27/07/12/file.csv
但导出过程花了2分钟,然后GCS到BQ dag开始,其失败。当我检查日志时,它显示未找到GCS URI,并且正在查找路径
gs://bucket/jobs/export_tbl/2020/09/27/07/14/file.csv
我已经在DAG的顶部声明了变量
由于多种原因,您不应该这样设置路径:
我猜您是在寻找execution_date,这是DAG及其任务实例运行的逻辑日期和时间。气流支持通过macros访问此文件
相关问题 更多 >
编程相关推荐