无法通过向多处理添加更多处理器来加快总计算时间。使用3个处理器与7个处理器运行所需的时间相同
我尝试过将数据分块,这样每个处理器都能处理更大的计算集。同样的结果。 我已经初始化了每个进程的静态数据,而不是作为参数传递。 我尝试过将DataFrame返回pool.map,而不是将其写入文件。 我已经确定了撤离合同部分的时间。使用3个处理器,需要1个合同和1个场景35秒才能完成。7个处理器运行相同的契约,场景需要55秒
import pandas as pd
import numpy as np
import multiprocessing as mp
import itertools as it
def initializer(Cont,Scen,RandWD,dicS):
global dfCleanCont
global dfScen
global dfWithdrawalRandom
global dicSensit
dfCleanCont = Cont
dfScen = Scen
dfWithdrawalRandom = RandWD
dicSensit = dicS
def ValueProj(ContScen):
Contract = dfCleanCont.loc[ContScen[0]]
PTS = Contract.name
ProjWDs = dfWithdrawalRandom[Contract['WD_ID']]
dfScenOneSet = dfScen[dfScen["Trial"]==ContScen[1]]
'''Do various projection calculations. All calculation in numpy arrays then converted to DataFrame before returning. Dataframe shape[601,35]'''
return dfContProj
def ReserveProjectionPreprocess(Scen,dfBarclayRates,dicProjOuterSeries,liProjValContract):
Timestep = liProjValContract[0]['Outer_t']
dfInnerLoopScen = SetupInnerLoopScenarios(Timestep,Scen,dicSensit)
BBC = BuildBarclayCurve(Timestep,Scen[Scen['Timestep']==Timestep][dicSensit['irCols']].iloc[0].to_list(),dfBarclayRates.loc[Timestep],dicSensit)
'''Do various inner loop projection calculations, up to 601 timesteps. All calculation in numpy arrays.'''
return pd.Series({'PTS': Contract.name,
'OuterScenNum': ContractValProjOne['OuterScenNum'],
'Outer_t': ContractValProjOne['Outer_t'],
'Reserve': max(PVL-ContractValProjOne['MV']-AssetHaircut,0)})
def EvalContract(liCS):
for CS in liCS:
'''Evaluate single contract with single scenario'''
start_time = time.time()
dfOuterLoop = ValueProj(CS)
Contract = dfCleanCont.loc[CS[0]]
PTS = Contract.name
dfScenOneSet = dfScen[dfScen["Trial"]==CS[1]]
dfOuterLoopCut = dfOuterLoop[dfOuterLoop['BV']!=0][:-1]
MinMVt = dicSensit['ProjectionYrs']*12 if sum(dfOuterLoop[(dfOuterLoop['MV']==0) & (dfOuterLoop['Outer_t']>0)]['Outer_t'])==0 else min(dfOuterLoop[(dfOuterLoop['MV']==0) & (dfOuterLoop['Outer_t']>0)]['Outer_t'])
MinBVt = dicSensit['ProjectionYrs']*12 if sum(dfOuterLoop[(dfOuterLoop['BV']==0) & (dfOuterLoop['Outer_t']>0)]['Outer_t'])==0 else min(dfOuterLoop[(dfOuterLoop['BV']==0) & (dfOuterLoop['Outer_t']>0)]['Outer_t'])
dicProjOuterSeries = {'Contract': Contract,
'BaseLapsePartContribution': dfOuterLoop['BaseLapsePartContribution'].values,
'BaseLapsePartNetTransfer': dfOuterLoop['BaseLapsePartNetTransfer'].values,
'BaseLapsePartWithdrawal': dfOuterLoop['BaseLapsePartWithdrawal'].values,
'PrudentEstDynPartWDPct': dfOuterLoop['PrudentEstDynPartWDPct'].values,
'KnownPutQueueWD': dfOuterLoop['KnownPutQueueWD'].values,
'BaseLapsePlanSponsor': dfOuterLoop['BaseLapsePlanSponsor'].values,
'PrudentEstDynPlanWDPct': dfOuterLoop['PrudentEstDynPlanWDPct'].values,
'MonthlyDefaultCharge': dfOuterLoop['MonthlyDefaultCharge'].values,
'Outer_t_Maturity': min(MinMVt,MinBVt)-1}
liProjValContract=[]
for _,row in dfOuterLoopCut.iterrows():
liProjValContract.append([row])
func=partial(ReserveProjectionPreprocess,dfScenOneSet,dicProjOuterSeries)
dfReserve = pd.concat(map(func,liProjValContract),axis=1,ignore_index=True).T
dfOuterLoopwRes = pd.merge(dfOuterLoop,dfReserve,how='left',on=['PTS','OuterScenNum','Outer_t'])
dfOuterLoopwRes['Reserve'].fillna(value=0,inplace=True)
fname='OuterProjection_{0}_{1}.parquet'.format(PTS,CS[1])
dfOuterLoopwRes.to_parquet(os.path.join(dicSensit['OutputDirOuterLoop'],fname),index=False)
return 1
if __name__ == '__main__':
dfCleanCont = 'DataFrame of 150 contract data. Each row is a contract with various info such as market value, interest rate, maturity date, etc. Indentifier index is "PTS". Shape[150,41]
dfScen = 'DataFrame of interest rate scenarios. 100 scenarios("Trial"). Each scenario has 601 timesteps and 11 interest rate term points. Shape[60100,13]'
liContID = list(dfCleanCont.index)
liScenID = dfScen["Trial"].unique()
liCS = list(it.product(liContID,liScenID))
pool=mp.Pool(7,initializer,(ns.dfCleanCont,ns.dfScen,dfWithdrawalRandom,dicSensit,))
n=10
liCSGroup=[liCS[x:x+n] for x in range(0,len(liCS),n)]
dfCombOuterProj = pd.concat(pool.map(func=EvalContract,iterable=liCSGroup))
pool.close()
pool.join()
期望更多的处理器能显著提高速度。某处有个瓶颈,但我好像找不到。尝试使用cProfile,但使用3或7个处理器获得相同的累计时间
目前没有回答
相关问题 更多 >
编程相关推荐