将处理器添加到python多处理中没有速度增益

2024-10-03 06:27:22 发布

您现在位置:Python中文网/ 问答频道 /正文

无法通过向多处理添加更多处理器来加快总计算时间。使用3个处理器与7个处理器运行所需的时间相同

我尝试过将数据分块,这样每个处理器都能处理更大的计算集。同样的结果。 我已经初始化了每个进程的静态数据,而不是作为参数传递。 我尝试过将DataFrame返回pool.map,而不是将其写入文件。 我已经确定了撤离合同部分的时间。使用3个处理器,需要1个合同和1个场景35秒才能完成。7个处理器运行相同的契约,场景需要55秒

import pandas as pd
import numpy as np
import multiprocessing as mp
import itertools as it

def initializer(Cont,Scen,RandWD,dicS):
    global dfCleanCont
    global dfScen
    global dfWithdrawalRandom
    global dicSensit
    dfCleanCont = Cont
    dfScen = Scen
    dfWithdrawalRandom = RandWD
    dicSensit = dicS

def ValueProj(ContScen):
    Contract = dfCleanCont.loc[ContScen[0]]
    PTS = Contract.name
    ProjWDs = dfWithdrawalRandom[Contract['WD_ID']]
    dfScenOneSet = dfScen[dfScen["Trial"]==ContScen[1]]
    '''Do various projection calculations. All calculation in numpy arrays then converted to DataFrame before returning. Dataframe shape[601,35]'''
    return dfContProj

def ReserveProjectionPreprocess(Scen,dfBarclayRates,dicProjOuterSeries,liProjValContract):
    Timestep = liProjValContract[0]['Outer_t']
    dfInnerLoopScen = SetupInnerLoopScenarios(Timestep,Scen,dicSensit)
    BBC = BuildBarclayCurve(Timestep,Scen[Scen['Timestep']==Timestep][dicSensit['irCols']].iloc[0].to_list(),dfBarclayRates.loc[Timestep],dicSensit)
    '''Do various inner loop projection calculations, up to 601 timesteps. All calculation in numpy arrays.'''
    return pd.Series({'PTS': Contract.name,
                    'OuterScenNum': ContractValProjOne['OuterScenNum'],
                    'Outer_t': ContractValProjOne['Outer_t'],
                    'Reserve': max(PVL-ContractValProjOne['MV']-AssetHaircut,0)})

def EvalContract(liCS):
    for CS in liCS:
        '''Evaluate single contract with single scenario'''
        start_time = time.time()
        dfOuterLoop = ValueProj(CS)
        Contract = dfCleanCont.loc[CS[0]]
        PTS = Contract.name
        dfScenOneSet = dfScen[dfScen["Trial"]==CS[1]]
        dfOuterLoopCut = dfOuterLoop[dfOuterLoop['BV']!=0][:-1]
        MinMVt = dicSensit['ProjectionYrs']*12 if sum(dfOuterLoop[(dfOuterLoop['MV']==0) & (dfOuterLoop['Outer_t']>0)]['Outer_t'])==0 else min(dfOuterLoop[(dfOuterLoop['MV']==0) & (dfOuterLoop['Outer_t']>0)]['Outer_t'])
        MinBVt = dicSensit['ProjectionYrs']*12 if sum(dfOuterLoop[(dfOuterLoop['BV']==0) & (dfOuterLoop['Outer_t']>0)]['Outer_t'])==0 else min(dfOuterLoop[(dfOuterLoop['BV']==0) & (dfOuterLoop['Outer_t']>0)]['Outer_t'])
        dicProjOuterSeries = {'Contract': Contract,
                            'BaseLapsePartContribution': dfOuterLoop['BaseLapsePartContribution'].values,
                            'BaseLapsePartNetTransfer': dfOuterLoop['BaseLapsePartNetTransfer'].values,
                            'BaseLapsePartWithdrawal': dfOuterLoop['BaseLapsePartWithdrawal'].values,
                            'PrudentEstDynPartWDPct': dfOuterLoop['PrudentEstDynPartWDPct'].values,
                            'KnownPutQueueWD': dfOuterLoop['KnownPutQueueWD'].values,
                            'BaseLapsePlanSponsor': dfOuterLoop['BaseLapsePlanSponsor'].values,
                            'PrudentEstDynPlanWDPct': dfOuterLoop['PrudentEstDynPlanWDPct'].values,
                            'MonthlyDefaultCharge': dfOuterLoop['MonthlyDefaultCharge'].values,
                            'Outer_t_Maturity': min(MinMVt,MinBVt)-1}
        liProjValContract=[]
        for _,row in dfOuterLoopCut.iterrows():
            liProjValContract.append([row])

        func=partial(ReserveProjectionPreprocess,dfScenOneSet,dicProjOuterSeries)
        dfReserve = pd.concat(map(func,liProjValContract),axis=1,ignore_index=True).T
        dfOuterLoopwRes = pd.merge(dfOuterLoop,dfReserve,how='left',on=['PTS','OuterScenNum','Outer_t'])
        dfOuterLoopwRes['Reserve'].fillna(value=0,inplace=True)
        fname='OuterProjection_{0}_{1}.parquet'.format(PTS,CS[1])
        dfOuterLoopwRes.to_parquet(os.path.join(dicSensit['OutputDirOuterLoop'],fname),index=False)
    return 1

if __name__ == '__main__':
    dfCleanCont = 'DataFrame of 150 contract data. Each row is a contract with various info such as market value, interest rate, maturity date, etc. Indentifier index is "PTS". Shape[150,41]
    dfScen = 'DataFrame of interest rate scenarios. 100 scenarios("Trial"). Each scenario has 601 timesteps and 11 interest rate term points. Shape[60100,13]'
    liContID = list(dfCleanCont.index)
    liScenID = dfScen["Trial"].unique()
    liCS = list(it.product(liContID,liScenID))

    pool=mp.Pool(7,initializer,(ns.dfCleanCont,ns.dfScen,dfWithdrawalRandom,dicSensit,))
    n=10
    liCSGroup=[liCS[x:x+n] for x in range(0,len(liCS),n)]
    dfCombOuterProj = pd.concat(pool.map(func=EvalContract,iterable=liCSGroup))
    pool.close()
    pool.join()

期望更多的处理器能显著提高速度。某处有个瓶颈,但我好像找不到。尝试使用cProfile,但使用3或7个处理器获得相同的累计时间


Tags: inas处理器ptspdvaluespoolouter