Nifi ExecuteScript/tail文件

2024-10-03 09:14:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我想跟踪日志文件。(不幸的是,在Nifi中,处理器TailFile并没有执行我想要的操作,因为如果删除taild文件,状态似乎会丢失)

因此,我希望使用ExecuteScript命令(Jython/python)创建一个类似的处理器。 我的主要问题是在Nifi Jython包中用Python高效地跟踪文件

> from org.apache.nifi.components.state import Scope from
> org.apache.commons.io import IOUtils from java.nio.charset import
> StandardCharsets from java.io import BufferedReader, InputStreamReader
> from org.apache.nifi.processor.io import StreamCallback
> 
> # Define a subclass of StreamCallback for use in session.write() class PyStreamCallback(StreamCallback):   def __init__(self, old_position,
> line_count):
>         self.__line_count = int(line_count)
>         self.__old_position = int(old_position)   def process(self, inputStream, outputStream):
>     reader = BufferedReader(InputStreamReader(inputStream))
>     try:
>         # Loop indefinitely
>         i = 0
>         tailled =[]
>         while True:
>             # Get the next line
>             line = reader.readLine()
>             # If there is no more content, break out of the loop
>             if line is None:
>                 break
>             else:
>                 i += 1
>             if i > self.__old_position:
>                 tailled.append(line)
>         outputs = '\n'.join(tailled)
>         outputStream.write(bytearray(outputs.encode('utf-8')))
> 
>     finally:
>         if reader is not None:
>             reader.close()
> 
> 
> # end class
> 
> 
> 
> stateManager = context.stateManager
> 
> stateMap = stateManager.getState(Scope.LOCAL) oldMap 
> stateManager.getState(Scope.LOCAL).toMap() flowFile = session.get()
> 
> if flowFile != None:
> 
>     unique_name = flowFile.getAttribute("absolute.path") + flowFile.getAttribute("filename")
>     line_count = flowFile.getAttribute("text.line.count")
>     new_map = {}
>     new_map.update(oldMap)
>     new_map.update({unique_name: line_count})
>     if stateMap.version == -1:
>         stateManager.setState(new_map, Scope.LOCAL)
>         old_position = stateMap.get(unique_name)
>         flowFile = session.putAttribute(flowFile, "old_position", old_position)
>         session.transfer(flowFile, REL_SUCCESS)
>     else:
>         if stateMap.get(unique_name):
>             old_position = stateMap.get(unique_name)
>             flowFile = session.putAttribute(flowFile, "old_position", old_position)
>             flowFile = session.write(flowFile, PyStreamCallback(old_position, line_count))
>             session.transfer(flowFile, REL_SUCCESS)
>             stateManager.replace(stateMap, new_map, Scope.LOCAL)

它的工作原理是注册状态(根据路径名:因此是上次读取的旧位置)并读取BufferedReader中的流文件(流)行。我的流文件可能相当大(XXX MB),因此这种方法效率很低。“Output”变量,很快就会变得很大,而我的Java堆(1.5GB)对于一个小文件(180MB)已经用完了

    # Loop indefinitely
    i = 0
    tailled =[]
    while True:
        # Get the next line
        line = reader.readLine()
        # If there is no more content, break out of the loop
        if line is None:
            break
        else:
            i += 1
        if i > self.__old_position:
            tailled.append(line)
    outputs = '\n'.join(tailled)

我知道文件中的行数和我需要从哪里开始(在我的例子中,这是一个流),您有没有想法以最好的方式获取最后几行? 我愿意从python转换

我想添加另一个处理器来执行读取(ExecuteStreamCommand)

谢谢你的帮助


Tags: 文件fromimportselfifsessioncountline