py4j - 使用java成员对象进行多进程操作

2024-10-02 12:29:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两个java对象,它们接受一些输入并返回一个输出。为了使它们在Python中可用,我使用 subprocess.popen(java -Xmx3g -Xms3g -jar <fullpath\test.jar>)

然后使用以下方法构建网关:

gateway = JavaGateway(
        gateway_parameters=GatewayParameters(port=11001),
        callback_server_parameters=CallbackServerParameters(port=21001))

# load functions
modelCallCard = gateway.entry_point.modelCallCard
modelCallFlexi = gateway.entry_point.modelCallFlexi

modelCallCard和modelCallFlexi是一些java函数(我不太了解这些函数,因为我在任何地方都找不到它们),如果我不使用多处理,它们可以正常工作:

def modelCall(args):
    PRODUCT = args['PRODUCT']
    derivedVarsJson = args['derivedVarsJson']

    if PRODUCT.upper() == 'CARD':
        scoreInfoJson = modelCallCard(derivedVarsJson)
    else:
        scoreInfoJson = modelCallFlexi(derivedVarsJson)
    return scoreInfoJson

def main():
    df = pd.read_csv(inputFileName, sep=dlm_out, encoding='utf-8', header=0, index_col='_index_', usecols=['_index_', 'ACC_ID', 'PRODUCT', 'derivedVarsJson'])

    df['_source_'] = df[['PRODUCT', 'derivedVarsJson']].to_dict(orient='records')
    df['scoreInfoJson'] = df['_source_'].map(modelCall)


if __name__ == '__main__':
    main()

我想利用多重处理并并行调用上述函数。为此,我将函数修改为:

def outer_func(args2):
    temp_df = args2[0]
    modelCallCard = args2[1]
    modelCallFlexi = args2[2]

    def modelCall(args):
        PRODUCT = args['PRODUCT']
        derivedVarsJson = args['derivedVarsJson']

        if PRODUCT.upper() == 'CARD':
            scoreInfoJson = modelCallCard(derivedVarsJson)
        else:
            scoreInfoJson = modelCallFlexi(derivedVarsJson)
        return scoreInfoJson

    temp_df['scoreInfoJson'] = temp_df['_source_'].map(modelCall)

    return temp_df

为了使用多处理调用修改后的函数,我做了以下更改:

def main():
    cpuCount = 2
    pool = multiprocessing.Pool(processes=cpuCount)
    reader = pd.read_csv(inputFileName, sep=dlm_out, encoding='utf-8', header=0, chunksize=chunkSize, index_col='_index_', usecols=['_index_', 'ACC_ID', 'PRODUCT', 'derivedVarsJson'])

    for c, df in enumerate(reader):
        logger.info(f'iterating ({c}) ...')

        df['_source_'] = df[['PRODUCT', 'derivedVarsJson']].to_dict(orient='records')

        dfs = np.array_split(df, cpuCount)
        args = [(dfs[i], modelCallCard, modelCallFlexi) for i in range(cpuCount)]

        dfResults = pool.map(outer_func, args)
        df = pd.concat(dfResults)
        pool.close()
        pool.join()

if __name__ == '__main__':
    main()

但这段代码引发了一个错误:

py4j.protocol.Py4JError: An error occurred while calling t.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)

Tags: 函数dfindexmaindefargsjavaproduct

热门问题