将时间戳声明为变量,但为每个任务获取不同的时间戳

2024-05-15 02:02:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在将数据从Postgresql导出到GCS,然后将GCS导出到BQ。DAG看起来不错。但在导出路径中,Im使用当前时间戳,如下所示

import datetime

date=str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M"))
year=date.split('-')[0]
month=date.split('-')[1]
day=date.split('-')[2]
hour=date.split('-')[3]
minutes=date.split('-')[4]
export_suffix = year + '/' + month + '/' + day + '/' + hour + '/' + minutes

因此TASK1(导出到GCS)将数据导出到此路径中

gs://bucket/jobs/export_tbl/2020/09/27/07/12/file.csv

但导出过程花了2分钟,然后GCS到BQ dag开始,其失败。当我检查日志时,它显示未找到GCS URI,并且正在查找路径

gs://bucket/jobs/export_tbl/2020/09/27/07/14/file.csv

我已经在DAG的顶部声明了变量


Tags: 数据路径gsdatetimedateexportyearbq
1条回答
网友
1楼 · 发布于 2024-05-15 02:02:01

由于多种原因,您不应该这样设置路径:

  • 这听起来像是将此代码段放在python文件的顶部,这意味着每次气流通过dag文件夹解析时都会执行此代码段(然后它会重新分配变量)
  • 如果计划重新运行任何dag运行,则输出路径将不同。dag不是独立的,这使得调试变得更加困难;重现结果

我猜您是在寻找execution_date,这是DAG及其任务实例运行的逻辑日期和时间。气流支持通过macros访问此文件

相关问题 更多 >

    热门问题