有没有办法通过REST-API获取PutSQL
处理器最后一次执行的时间戳?这样的时间戳是否存在,或者我是否可以自己构建一个
设置:我有气流来触发我的Nifi ETL,它以两个PutSQL处理器结束-完成这些之后,我需要在气流中执行其他操作
想法:我想触发第一个Nifi处理器,然后等待最后一个PutSQL处理器的last_execution_timestamp
更新
问题:
我尝试访问属性statsLastRefreshed
,但这不是最后一次执行,而是最后一次任何东西(用户/api请求)访问处理器,导致Nifi刷新处理器
s = processor["status"]["statsLastRefreshed"] # '13:13:26 CEST'
我在气流的RESTAPI文档中找不到任何东西
我看到的唯一其他选择是从Airflow向最后一个PutSQL处理器的数据库表发出请求,以查看是否有新的情况发生
Nifi适用于连续数据流。因此
Processor
stats中没有最后的执行时间尽管Nifi能够进行ETL(不是完全成熟的),但我相信如果您的流只需要文件和数据库访问,那么气流比Nifi更适合ETL过程
将气流和Nifi集成到ETL任务中有点复杂。如果可能的话,你可以考虑选择其中一个不会遇到像你在这里描述的问题。
如果您的流包含大量不同的输入、复杂的逻辑和接收大文件,那么选择仅气流并不容易
我想出了一个解决办法
在处理器中添加名为
${now()}
的自定义属性mypropertyname任何通过处理器的流文件都将具有通过处理器的时间戳作为属性
在步骤1中的处理器后面有一个
UpdateAttribute
处理器,选项(在处理器属性下)存储状态设置为Store state locally
在
UpdateAttribute
处理器中添加名为readable_property的自定义属性,并将其设置为值${'mypropertyname'}
处理器的状态现在包含最后一个流文件的值(例如,带有从步骤1开始执行
now()
方法的时间戳)返回的JSON包含以下行:
然后,您只需解析JSON中的值
注意:在前一个处理器使用
now
向流文件添加属性和流文件实际通过UpdateAttribute
处理器读取时间戳之间,会有一点延迟相关问题 更多 >
编程相关推荐