我想实现一种动态FTP传感器。使用所提供的FTP传感器,我设法使其以这种方式工作:
ftp_sensor = FTPSensor(
task_id="detect-file-on-ftp",
path="./data/test.txt",
ftp_conn_id="ftp_default",
poke_interval=5,
dag=dag,
)
而且效果很好。但我需要传递动态路径和ftp_conn_id参数。也就是说,我在以前的任务中生成了一组新连接,在ftp_传感器任务中,我想检查我以前生成的每个新连接,如果ftp上存在文件
所以我想首先从XCom获取连接的ID。 我从XCom中的上一个任务发送它们,但似乎无法在任务之外访问XCom。 我的目标是:
active_ftp_connections = context['ti'].xcom_pull(key='active_ftps')
for conn in active_ftp_connections:
ftp_sensor = FTPSensor(
task_id="detect-file-on-ftp",
path=conn['path'],
ftp_conn_id=conn['connection'],
poke_interval=5,
dag=dag,
)
但这似乎不是一个可行的解决方案
然后我浪费了大量时间尝试创建自定义FTP传感器,以便动态传递所需的数据,但现在我得出结论,我需要传感器和操作员之间的混合,因为我需要保留戳功能,但也需要执行功能。 我想一个选择是编写一个自定义操作符,从sensor基类实现poke,但现在可能太累了,无法尝试这样做
你知道如何实现我的目标吗?我似乎在网上找不到关于这个话题的任何资料——也许只有我一个人。 如果问题不清楚,请告诉我,以便我提供更多详细信息
更新
我现在把这当作可能
def get_active_ftps(**context):
active_ftp_connestions = context['ti'].xcom_pull(key='active_ftps')
return active_ftp_connestions
for ftp in get_active_ftps():
ftp_sensor = FTPSensor(
task_id="detect-file-on-ftp",
path="./"+ ftp['folder'] +"/test.txt",
ftp_conn_id=ftp['conn_id'],
poke_interval=5,
dag=dag,
)
但是它抛出了一个错误:Broken DAG: [/usr/local/airflow/dags/copy_file_from_ftp.py] 'ti'
我是这样做的:
然后让dag一次运行一个ftp帐户,因为我意识到在一个直接的非循环图中不应该有循环。。。显然
相关问题 更多 >
编程相关推荐