我想跟踪日志文件。(不幸的是,在Nifi中,处理器TailFile并没有执行我想要的操作,因为如果删除taild文件,状态似乎会丢失)
因此,我希望使用ExecuteScript命令(Jython/python)创建一个类似的处理器。 我的主要问题是在Nifi Jython包中用Python高效地跟踪文件
> from org.apache.nifi.components.state import Scope from
> org.apache.commons.io import IOUtils from java.nio.charset import
> StandardCharsets from java.io import BufferedReader, InputStreamReader
> from org.apache.nifi.processor.io import StreamCallback
>
> # Define a subclass of StreamCallback for use in session.write() class PyStreamCallback(StreamCallback): def __init__(self, old_position,
> line_count):
> self.__line_count = int(line_count)
> self.__old_position = int(old_position) def process(self, inputStream, outputStream):
> reader = BufferedReader(InputStreamReader(inputStream))
> try:
> # Loop indefinitely
> i = 0
> tailled =[]
> while True:
> # Get the next line
> line = reader.readLine()
> # If there is no more content, break out of the loop
> if line is None:
> break
> else:
> i += 1
> if i > self.__old_position:
> tailled.append(line)
> outputs = '\n'.join(tailled)
> outputStream.write(bytearray(outputs.encode('utf-8')))
>
> finally:
> if reader is not None:
> reader.close()
>
>
> # end class
>
>
>
> stateManager = context.stateManager
>
> stateMap = stateManager.getState(Scope.LOCAL) oldMap
> stateManager.getState(Scope.LOCAL).toMap() flowFile = session.get()
>
> if flowFile != None:
>
> unique_name = flowFile.getAttribute("absolute.path") + flowFile.getAttribute("filename")
> line_count = flowFile.getAttribute("text.line.count")
> new_map = {}
> new_map.update(oldMap)
> new_map.update({unique_name: line_count})
> if stateMap.version == -1:
> stateManager.setState(new_map, Scope.LOCAL)
> old_position = stateMap.get(unique_name)
> flowFile = session.putAttribute(flowFile, "old_position", old_position)
> session.transfer(flowFile, REL_SUCCESS)
> else:
> if stateMap.get(unique_name):
> old_position = stateMap.get(unique_name)
> flowFile = session.putAttribute(flowFile, "old_position", old_position)
> flowFile = session.write(flowFile, PyStreamCallback(old_position, line_count))
> session.transfer(flowFile, REL_SUCCESS)
> stateManager.replace(stateMap, new_map, Scope.LOCAL)
它的工作原理是注册状态(根据路径名:因此是上次读取的旧位置)并读取BufferedReader中的流文件(流)行。我的流文件可能相当大(XXX MB),因此这种方法效率很低。“Output”变量,很快就会变得很大,而我的Java堆(1.5GB)对于一个小文件(180MB)已经用完了
# Loop indefinitely i = 0 tailled =[] while True: # Get the next line line = reader.readLine() # If there is no more content, break out of the loop if line is None: break else: i += 1 if i > self.__old_position: tailled.append(line) outputs = '\n'.join(tailled)
我知道文件中的行数和我需要从哪里开始(在我的例子中,这是一个流),您有没有想法以最好的方式获取最后几行? 我愿意从python转换
我想添加另一个处理器来执行读取(ExecuteStreamCommand)
谢谢你的帮助
目前没有回答
相关问题 更多 >
编程相关推荐