我有两个java对象,它们接受一些输入并返回一个输出。为了使它们在Python中可用,我使用
subprocess.popen(java -Xmx3g -Xms3g -jar <fullpath\test.jar>)
然后使用以下方法构建网关:
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=11001),
callback_server_parameters=CallbackServerParameters(port=21001))
# load functions
modelCallCard = gateway.entry_point.modelCallCard
modelCallFlexi = gateway.entry_point.modelCallFlexi
modelCallCard和modelCallFlexi是一些java函数(我不太了解这些函数,因为我在任何地方都找不到它们),如果我不使用多处理,它们可以正常工作:
def modelCall(args):
PRODUCT = args['PRODUCT']
derivedVarsJson = args['derivedVarsJson']
if PRODUCT.upper() == 'CARD':
scoreInfoJson = modelCallCard(derivedVarsJson)
else:
scoreInfoJson = modelCallFlexi(derivedVarsJson)
return scoreInfoJson
def main():
df = pd.read_csv(inputFileName, sep=dlm_out, encoding='utf-8', header=0, index_col='_index_', usecols=['_index_', 'ACC_ID', 'PRODUCT', 'derivedVarsJson'])
df['_source_'] = df[['PRODUCT', 'derivedVarsJson']].to_dict(orient='records')
df['scoreInfoJson'] = df['_source_'].map(modelCall)
if __name__ == '__main__':
main()
我想利用多重处理并并行调用上述函数。为此,我将函数修改为:
def outer_func(args2):
temp_df = args2[0]
modelCallCard = args2[1]
modelCallFlexi = args2[2]
def modelCall(args):
PRODUCT = args['PRODUCT']
derivedVarsJson = args['derivedVarsJson']
if PRODUCT.upper() == 'CARD':
scoreInfoJson = modelCallCard(derivedVarsJson)
else:
scoreInfoJson = modelCallFlexi(derivedVarsJson)
return scoreInfoJson
temp_df['scoreInfoJson'] = temp_df['_source_'].map(modelCall)
return temp_df
为了使用多处理调用修改后的函数,我做了以下更改:
def main():
cpuCount = 2
pool = multiprocessing.Pool(processes=cpuCount)
reader = pd.read_csv(inputFileName, sep=dlm_out, encoding='utf-8', header=0, chunksize=chunkSize, index_col='_index_', usecols=['_index_', 'ACC_ID', 'PRODUCT', 'derivedVarsJson'])
for c, df in enumerate(reader):
logger.info(f'iterating ({c}) ...')
df['_source_'] = df[['PRODUCT', 'derivedVarsJson']].to_dict(orient='records')
dfs = np.array_split(df, cpuCount)
args = [(dfs[i], modelCallCard, modelCallFlexi) for i in range(cpuCount)]
dfResults = pool.map(outer_func, args)
df = pd.concat(dfResults)
pool.close()
pool.join()
if __name__ == '__main__':
main()
但这段代码引发了一个错误:
py4j.protocol.Py4JError: An error occurred while calling t.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
目前没有回答
相关问题 更多 >
编程相关推荐