如何使用带有python的ExecuteScript从nifi中的一个传入流文件创建多个流文件

2024-04-26 17:49:53 发布

您现在位置:Python中文网/ 问答频道 /正文

在本地运行,这正是我希望它的工作方式(在位置7-10有一个包含许多不同代码的传入流文件,每个唯一代码输出一个文件),例如,如果记录1-5在位置7-10有1234,记录6在位置7-10有2345,记录7在位置7-10有1234,然后将有一个名为1234_file.txt的文件,其中包含第1-5行和第7行,第二个文件2345_file.txt将包含输入文件中的第6行:

f = open("test_comp.txt", "r")
for x in f:
    comp = x[6:10]
    print(comp)
    n = open(comp+"_file.txt","a")
    n.write(x)
    n.close()
f.close()

在nifi中,我正在尝试以下方法:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    f = open(inputStream, 'r')
    for x in f:
        comp = x[6:10]
        print("comp: ",comp)
        newFile = open(comp+"_file.txt","a")
        newFile.write(x)


flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, REL_SUCCESS)
session.commit()

它似乎得到了输入,并按预期正确地将comp存储在位置7-10,但我没有得到多个流文件(对于x[6:10]中的每个唯一字符串),并且输出的流文件是1个零字节文件

有没有想过我错过了什么


Tags: 文件代码infromimporttxtforsession
1条回答
网友
1楼 · 发布于 2024-04-26 17:49:53

您直接写入文件系统中的文件,而不是NiFi生态系统中的对象flowfiles。我建议阅读Apache NiFi Developer's Guide以了解这些模式的上下文,并查看一些Python ExecuteScript examples以查看相关的Python代码

您需要创建多个flowfile对象,将数据映射到它们,然后将它们中的所有传输到相应的关系,而不是将单个flowfile写入

是否有理由需要使用自定义Python代码而不是^{}和/或^{}处理器来完成此任务?我认为PartitionRecord可以很容易地解决你描述的问题

相关问题 更多 >