使用Nifi ExecuteScript进程生成多个流文件

2024-06-01 01:17:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在处理一个Nifi流,在这个流中我得到了一个具有多个键值对的JSON文档。我正在使用ExecuteScript处理器和python。在

我的目标是基于JSONkeys创建各种url。键是数字的,它们看起来像这样:

keys = [10200, 10201, 10202, ...]

我想要的URL有3种类型,它们应该如下所示:

^{pr2}$

我正在尝试遍历我的keys[]并为它包含的每个数字键创建3个特定的url。我有以下代码:

从列表中读取一个数字键-->创建3个url -->输出一个流文件。在

。。。。。。然后读取列表中的下一个数字键并继续循环。。。。。在

我有下面的代码,但是当我给它一个JSON流文件时,它现在什么也不做。有人能告诉我我做错了什么吗?在

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):

  def __init__(self):
        self.parentFlowFile = None
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    flowfiles_list = [] 

    outputStream.write(bytearray(json.dumps(obj.keys(), indent=4).encode('utf-8')))


    for numerical_key in obj.keys():
      # create 1 flowfile for each numerical_key. Each flow file should have 3 url attributes 
      flowFile = session.create(self.parentFlowFile)
      if (flowFile != None):
        flowFile = session.write(flowFile, "Does not matter")
        flowFile = session.putAttribute(flowFile, "google", "http://google.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://bing.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://yahoo.com/"+ numerical_key)
        flowfiles_list.append(flowFile)

    for flow in flowfiles_list:
      session.transfer(flow, REL_SUCCESS)

Tags: keyfromioimportselfjsonobjurl
1条回答
网友
1楼 · 发布于 2024-06-01 01:17:19

好问题,这是流文件API回调方法的细微差别。您已经创建了StreamCallback的一个子类,但还没有检索到输入流文件,也没有使用它通过类的实例覆盖内容。在

在定义ModJSON类之后尝试以下操作:

originalFlowFile = session.get()
if(originalFlowFile != None):
    originalFlowFile = session.write(flowFile, ModJSON())
    session.remove(originalFlowFile)

这将获得一个输入流文件(或等待一个出现),然后调用StreamCallback来覆盖流文件的内容。在我的示例中,您将放弃您的输入流文件,因此如果这是您的用例的正确行为,那么您可以只扩展InputStreamCallback而不是StreamCallback并删除输出流.write(),如果您不使用outputStream进行任何操作。为此,用InputStreamCallback替换StreamCallback,并从process()方法中删除“outputStream”参数。在

在您的示例中,一旦您在上面添加了我的代码段,您将使用json.dumps文件()命令,以及创建和传输新文件,所有这些操作都指向相同的关系(成功),因此,如果文件的格式不相同,则可能会导致问题(这就是我添加会话.删除()). 如果需要原始流文件与其他流文件产生不同的关系,请考虑InvokeScriptedProcessor而不是ExecuteScript。如果您不关心处理后的输入流文件(添加URL属性完成),那么请遵循我上面的建议。如果他们都能走出同样的关系(成功),那就换掉我的会话.删除()与

^{pr2}$

查看我的executeScriptCookbook文章(part 2,共3篇),以获取Jython(和其他语言)中这些用例的更多示例:)

相关问题 更多 >