使用执行脚本处理器复制流文件

2024-10-02 12:36:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个流文件和一个属性(数据类型中的列表)。我正在尝试为属性中的每个元素复制流文件

这就是它的运行方式-属性(tia_客户端:['A','B','C'])有一个元素列表。对于列表中的每个元素,我都试图复制流文件。也就是说,生成属性为“a”的流文件,以及属性为“B”的流文件,依此类推

这是我编好的密码。生成与属性(列表)大小相等的流文件。但是,flowfile为0字节(未生成)

我错过了什么

import sys
sys.path.append('/usr/lib/python2.7/site-packages')
from picoredis import Redis
reload(sys)
sys.setdefaultencoding('utf8')
import ast

import sys
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback, OutputStreamCallback, StreamCallback


# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        self.charset = StandardCharsets.UTF_8
        self.parentFlowFile = None
    
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(text.encode('utf-8'))
        
#Get the flow file
parentFlowFile = session.get()

#check if the flowfile is not blank
if(parentFlowFile != None):
    #Enclose the code in try block to catch the thrown exceptions
    try:
        flowfiles_list = []
        #Call the class which does the parsing
        splitCallback = PyStreamCallback()
        splitCallback.parentFlowFile = parentFlowFile
        clients = parentFlowFile.getAttribute('tia_clients')
        clients = ast.literal_eval(clients)
                            
        for client in clients:
            flowFile = session.create(parentFlowFile)
            if (flowFile != None):
                try:
                    flowFile = session.write(flowFile, splitCallback)
                    flowFile = session.putAttribute(flowFile, unicode('client').encode('utf-8'), unicode(client).encode('utf-8'))
                    flowfiles_list.append(flowFile)
                  
                except Exception as e:
                    session.remove(flowFile)
                    raise

        for flow in flowfiles_list:
            session.transfer(flow, REL_SUCCESS)
             
    #The except block to catch the exceptions thrown by the try block
    except Exception as e:
        parentFlowFile = session.putAttribute(parentFlowFile, unicode('Error').encode('utf-8'), unicode(e).encode('utf-8'))
        tb = sys.exc_info()[2]
        parentFlowFile = session.putAttribute(parentFlowFile, unicode('Error line number').encode('utf-8'), unicode(tb.tb_lineno).encode('utf-8')) 
        #if there are exceptions thrown, the flow file will be moved to the failure queue
        session.transfer(parentFlowFile, REL_FAILURE)
        
session.remove(parentFlowFile)

Tags: 文件theinfromimport列表属性session

热门问题