我正在使用Nifi中的处理器从Kafka获取数据并将其写入文件。脚本是用python编写的。在
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(bytearray(text))
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
我读过java中的这个错误,但我不明白为什么会在这里产生它。在
如果您只想将来自Kafka的消息写入一个文件,是否有什么原因不能只使用ConsumeKafka->;PutFile?在
错误是因为您只在if(flowFile)中放了两行代码!=None):如果flow file为null,则仍在第18行调用transfer,这将产生错误。在
相关问题 更多 >
编程相关推荐