Apache气流中的动态FTP传感器

2024-06-16 12:27:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我想实现一种动态FTP传感器。使用所提供的FTP传感器,我设法使其以这种方式工作:

ftp_sensor = FTPSensor(
        task_id="detect-file-on-ftp",        
        path="./data/test.txt",
        ftp_conn_id="ftp_default",
        poke_interval=5,
        dag=dag,
    )

而且效果很好。但我需要传递动态路径和ftp_conn_id参数。也就是说,我在以前的任务中生成了一组新连接,在ftp_传感器任务中,我想检查我以前生成的每个新连接,如果ftp上存在文件

所以我想首先从XCom获取连接的ID。 我从XCom中的上一个任务发送它们,但似乎无法在任务之外访问XCom。 我的目标是:

active_ftp_connections = context['ti'].xcom_pull(key='active_ftps')
for conn in active_ftp_connections:
  ftp_sensor = FTPSensor(
        task_id="detect-file-on-ftp",        
        path=conn['path'],
        ftp_conn_id=conn['connection'],
        poke_interval=5,
        dag=dag,
    )

但这似乎不是一个可行的解决方案

然后我浪费了大量时间尝试创建自定义FTP传感器,以便动态传递所需的数据,但现在我得出结论,我需要传感器和操作员之间的混合,因为我需要保留戳功能,但也需要执行功能。 我想一个选择是编写一个自定义操作符,从sensor基类实现poke,但现在可能太累了,无法尝试这样做

你知道如何实现我的目标吗?我似乎在网上找不到关于这个话题的任何资料——也许只有我一个人。 如果问题不清楚,请告诉我,以便我提供更多详细信息

更新

我现在把这当作可能

def get_active_ftps(**context):
    active_ftp_connestions = context['ti'].xcom_pull(key='active_ftps')
    return active_ftp_connestions

for ftp in get_active_ftps():
    ftp_sensor = FTPSensor(
            task_id="detect-file-on-ftp",        
            path="./"+ ftp['folder'] +"/test.txt",
            ftp_conn_id=ftp['conn_id'],
            poke_interval=5,
            dag=dag,
        )

但是它抛出了一个错误:Broken DAG: [/usr/local/airflow/dags/copy_file_from_ftp.py] 'ti'


Tags: pathidtaskftp动态传感器sensorconn
1条回答
网友
1楼 · 发布于 2024-06-16 12:27:41

我是这样做的:

active_ftp_folder = Variable.get('active_ftp_folder')
active_ftp_conn_id = Variable.get('active_ftp_conn_id')
ftp_sensor = FTPSensor(
        task_id="detect-file-on-ftp",        
        path="./"+ active_ftp_folder +"/test.txt",
        ftp_conn_id=active_ftp_conn_id,
        poke_interval=5,
        dag=dag,
    )

然后让dag一次运行一个ftp帐户,因为我意识到在一个直接的非循环图中不应该有循环。。。显然

相关问题 更多 >