获取Nifi PutSQL处理器的最后执行时间戳

2024-04-28 15:35:56 发布

您现在位置:Python中文网/ 问答频道 /正文

有没有办法通过REST-API获取PutSQL处理器最后一次执行的时间戳?这样的时间戳是否存在,或者我是否可以自己构建一个

设置:我有气流来触发我的Nifi ETL,它以两个PutSQL处理器结束-完成这些之后,我需要在气流中执行其他操作

想法:我想触发第一个Nifi处理器,然后等待最后一个PutSQL处理器的last_execution_timestamp更新

问题: 我尝试访问属性statsLastRefreshed,但这不是最后一次执行,而是最后一次任何东西(用户/api请求)访问处理器,导致Nifi刷新处理器

s = processor["status"]["statsLastRefreshed"]  # '13:13:26 CEST'

我在气流的RESTAPI文档中找不到任何东西

我看到的唯一其他选择是从Airflow向最后一个PutSQL处理器的数据库表发出请求,以查看是否有新的情况发生


Tags: 用户restapi属性时间etl处理器timestamp
2条回答

Nifi适用于连续数据流。因此Processorstats中没有最后的执行时间

尽管Nifi能够进行ETL(不是完全成熟的),但我相信如果您的流只需要文件和数据库访问,那么气流比Nifi更适合ETL过程

将气流和Nifi集成到ETL任务中有点复杂。如果可能的话,你可以考虑选择其中一个不会遇到像你在这里描述的问题。

如果您的流包含大量不同的输入、复杂的逻辑和接收大文件,那么选择仅气流并不容易

我想出了一个解决办法

  1. 在处理器中添加名为${now()}的自定义属性mypropertyname

  2. 任何通过处理器的流文件都将具有通过处理器的时间戳作为属性

  3. 在步骤1中的处理器后面有一个UpdateAttribute处理器,选项(在处理器属性下)存储状态设置为Store state locally

  4. UpdateAttribute处理器中添加名为readable_property的自定义属性,并将其设置为值${'mypropertyname'}

处理器的状态现在包含最后一个流文件的值(例如,带有从步骤1开始执行now()方法的时间戳)

  1. 通过REST-API和URI上的Get获取有状态处理器的值(以及通过(!)的最后一个流文件的值)(例如,在气流中)

返回的JSON包含以下行:

{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}

然后,您只需解析JSON中的值

注意:在前一个处理器使用now向流文件添加属性和流文件实际通过UpdateAttribute处理器读取时间戳之间,会有一点延迟

相关问题 更多 >