通过apache检查google bucket中是否存在文件?

2024-04-19 03:55:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个DAG,它在Google cloud bucket中获取脚本的结果,将其加载到Google BigQuery的表中,然后删除bucket中的文件

我希望DAG在周末每小时检查一次。现在,我正在使用Google CloudStorageToBigQueryOperator。如果文件不存在,DAG将失败。是否有一种方法可以将DAG设置为在文件不存在的情况下不会失败?也许试试看


Tags: 文件方法脚本cloudbucketgoogle情况bigquery
1条回答
网友
1楼 · 发布于 2024-04-19 03:55:33

在运行下游任务之前,您可以使用googleprovider包中的GCSObjectExistenceSensor来验证文件是否存在

gcs_object_exists = GCSObjectExistenceSensor(
    bucket=BUCKET_1,
    object=PATH_TO_UPLOAD_FILE,
    mode='poke',
    task_id="gcs_object_exists_task",
)

您可以查看官方示例here。请记住,此传感器从BaseSensorOperator扩展,因此您可以定义参数,如poke_intervaltimeoutmode,以满足您的需要

  • soft_fail (bool) – Set to true to mark the task as SKIPPED on failure
  • poke_interval (float) – Time in seconds that the job should wait in between each tries
  • timeout (float) – Time, in seconds before the task times out and fails.
  • mode (str) – How the sensor operates. Options are: { poke | reschedule }, default is poke. When set to poke the sensor is taking up a worker slot for its whole execution time and sleeps between pokes. Use this mode if the expected runtime of the sensor is short or if a short poke interval is required. Note that the sensor will hold onto a worker slot and a pool slot for the duration of the sensor’s runtime in this mode. When set to reschedule the sensor task frees the worker slot when the criteria is not yet met and it’s rescheduled at a later time. Use this mode if the time before the criteria is met is expected to be quite long. The poke interval should be more than one minute to prevent too much load on the scheduler.
  • exponential_backoff (bool) – allow progressive longer waits between pokes by using exponential backoff algorithm

source

相关问题 更多 >