通过气流将文件从GCS复制到Google Drive

2024-05-15 10:43:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我想实例化一个任务(通过airflow),将google云存储中存储桶中的文件复制到驱动器

我使用位于以下位置的专用操作员:

from airflow.contrib.operators.gcs_to_gdrive_operator import GcsToGDriveOperator

然后操作员:

copy_files = GcsToGDriveOperator(
        task_id="copy_files",
        source_bucket=GCS_BUCKET_ID,
        source_object='{}/{}/forecasted/*'.format(COUNTRY_TRIGRAM, PRED_START_RANGE),
        destination_object="content/drive/Shared Drives/FORECAST_TEST",
        gcp_conn_id="airflow_service_account_conn_w_drive"
    )

任务已成功,但不要复制“目标对象”中的文件,这是我不确定要放入的部分


Tags: 文件实例idsourceobjectgooglefilesdrive
1条回答
网友
1楼 · 发布于 2024-05-15 10:43:36

回顾AirflowGcsToGDriveOperator源代码,我假设Airflow利用gcs_hook.download()方法从GCS下载文件并gdrive_hook.upload_file()将这些对象上载到目标Gdrive位置

如上所述,gcs_hook.download()方法记录成功操作结果的每个操作:

self.log.info('File downloaded to %s', filename)

类似地,gdrive_hook.upload_file()将每个文件上载迭代写入日志消息:

self.log.info("File %s uploaded to gdrive://%s.", local_location, remote_location)

即使任务成功了,我相信您也可以在特定任务的logs中捕获上述事件,查找从GcsToGDriveOperator()定义派生的实际源和目标位置路径

可考虑将气流工日志检查连接到GKE集群并启动^ {CD4>}命令行工具:

kubectl logs deployment/airflow-worker -n $(kubectl get ns| grep composer*| awk '{print $1}') -c airflow-worker | grep 'Executing copy'

相关问题 更多 >