获取异常“没有名为'afflow.providers.sftp'的模块”

2024-04-26 18:44:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试将文件从SFTP复制到google云存储

  • 作曲家版本=1.16.12
  • 气流版本=1.10.15

执行获取异常时No module named 'airflow.providers.sftp'。 如果有人能给点建议,我将不胜感激

代码片段是:

import os
import airflow
from airflow import DAG
from airflow import models
from airflow.operators import python_operator
from airflow.providers.google.cloud.transfers.sftp_to_gcs import SFTPToGCSOperator
from airflow.utils.dates import days_ago


with models.DAG("test_ssh_to_gcs", start_date=days_ago(1), schedule_interval=None) as dag:

    copy_file_from_ssh_to_gcs = SFTPToGCSOperator(
        task_id="file-copy-ssh-to-gcs",
        source_path="/ ",
        destination_bucket='test_sftp_to_gcs',
        destination_path="test/test.csv",
        gcp_conn_id="google_cloud_default",
        sftp_conn_id="sftp_test",
    )

copy_file_from_ssh_to_gcs

Tags: tofromtestimport版本idgooglessh
3条回答

由于SFTPToGCSOperator在发动机罩下使用airflow.providers.sftp.operators.SFTPOperator而导致该错误,该错误出现在气流中>;=2.0.0.

坏消息是需要升级气流版本以使用airflow.providers.google.cloud.transfers.sftp_to_gcs.SFTPToGCSOperator

如果不希望/无法升级气流,可以创建DAG链接两个操作符:

^{tb1}$

这应该可以做到:


LOCALFILE = '/tmp/kk'

with models.DAG("test_ssh_to_gcs", start_date=days_ago(1), schedule_interval=None) as dag:

   download_sftp = SFTPOperator(
      task_id = 'part1_sftp_download_to_local',
      ssh_conn_id="sftp_test",
      local_file=LOCALFILE,
      remote_file='',
      operation='get')

   gcp_upload = FileToGoogleCloudStorageOperator(
      task_id='part2_upload_to_gcs',
      bucket='test_sftp_to_gcs',
      src=LOCALFILE,
      dst="test/test.csv",
      google_cloud_storage_conn_id="google_cloud_default" # configured in Airflow
   ) 

   sftp_download >> gcp_upload 

使用airflow 1.10,您可以安装backported packages

对于您的情况,需要将以下内容添加到composer群集:

1-apache airflow后台端口提供商google

2-apache airflow后端口提供程序sftp

3-apache airflow后端口提供程序ssh

首先,您是否尝试过用pip install apache-airflow-providers-sftp安装package
还要注意所引用的文档版本。在Airflow 2.0中,一些软件包已经moved

相关问题 更多 >