我试图通过使用Python中的concurrnet.futures包来充分利用我的CPU能力(因为它被限制在最大15%左右),从而加速我的代码。然而,无论我做什么尝试,我都会遇到相同的中断进程池错误
我在这个网站上遇到了很多问题,并尝试实施了我理解的每一个解决方案(我还没有那么丰富的经验),但似乎无法解决这个错误
所以我要做的是导入一个巨大的.csv文件,并根据特定条件对其进行过滤。现在我知道我这样做可能不是最合乎逻辑的,但我也在努力学习如何为未来的情况做多重处理
我的代码如下
'Load Master Data'
MasterData = pd.read_csv('FINAL MASTER DATA.csv')
MasterData.reset_index(drop = True, inplace = True)
print('Data Loaded')
这会将文件加载到一个包含150多万行和18列数据的数据框中
'Format Forecast Date'
ForecastinWk = pd.DataFrame()
ForecastinWk['WkYr'] = MasterData.apply(lambda x: str(x['Forecasted in Wk']) + str('-') + str(x['Forecasted in Yr']), axis = 1)
print('Weeks & Years gathered')
ForecastinWk['WkYr'] = ForecastinWk['WkYr'].apply(lambda x: datetime.datetime.strptime('1-' + x, '%w-%W-%Y'))
print('WkYr formatted')
这段代码然后将日期部分格式化为我所需的格式
def UseRowSelector(i, Masterdata, ForecastinWk):
print(i)
print('Determining whether to use row')
MasterDataForecastedMonth = MasterData.loc[i,'Forecast for Month']
LeadtimeDate = ForecastinWk.loc[i,'Date']
if MasterDataForecastedMonth == LeadtimeDate:
UseRow = True
else:
UseRow = False
return UseRow,i
因此,我希望这段代码返回[True,Nr]或[False,Nr],并打印“I”和文本“确定是否使用行”(两者都不会出现在控制台中)
def main(MasterData, ForecastinWk):
MasterDataIndexList = list(MasterData.index)
UseRows = pd.DataFrame()
print('creating multiple processes')
with concurrent.futures.ProcessPoolExecutor() as executor:
print('executor created')
future_to_index = {executor.submit(UseRowSelector, i, MasterData, ForecastinWk): i for i in [1,2,3,4,5,6,7]}
print('future created')
for future in concurrent.futures.as_completed(future_to_index):
print('selected future')
UseRow = future_to_index[future]
Result = future.result()
try:
UseRows.loc[Result[1],0] = Result[0]
print(Result[1])
except:
print('error')
return UseRows
本节应调用prep数据并调用UseRowSelector函数。由于数据有超过150万行,我试图使用多处理来允许不同的CPU内核为不同的行执行UseRowSelector函数(因此它们彼此不依赖)
然后,在调用实际的多处理函数并稍后将输出保存到.csv文件之前,应针对不同的预测交付周期时间段运行以下命令,并准备数据进行处理
'Set Forecast Leadtime in wks'
for Leadtime in [5,10,15,20,25,30]:
print(str('LEAD TIME -------------------- ' + str(Leadtime) + ' Weeks --------------------'))
ForecastinWk['Date'] = ForecastinWk['WkYr'].apply(lambda x: x + pd.DateOffset(weeks = Leadtime))
print('Dates offset')
ForecastinWk['Date'] = ForecastinWk['Date'].apply(lambda x: x.strftime('%b %Y'))
print('Dates formatted')
if __name__ == '__main__':
print('name is main')
UseRows = main(MasterData, ForecastinWk)
print('Creating Master Data Section')
MasterDataSelection = MasterData[UseRows.iloc[:,0]]
MasterDataSelection.reset_index(drop = True, inplace = True)
MasterDataSelection.to_csv(str('MasterDataDelen/' + str(Leadtime) + 'Wk Leadtime.csv'), index = False)
MasterDataSelection.to_csv(str('MasterDataDelen/' + str(Leadtime) + 'Wk Leadtime - Acc.csv'))
print('excel created')
根据对其他海报的回答,我添加了if __name__ = '__main__'
部分,并确保将所需的全部数据传递给调用的函数
如果我尝试手动运行这些部分;我可以一直运行程序,直到打印('selected future')部分-因此,当我运行future.result()时,它将导致错误:BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending
如果我尝试立即运行完整脚本(F5),我仍然会看到正在打印“selected future”打印语句,但此后我的CPU使用率下降到0%,我看不到任何情况发生-没有引发错误,但python会继续处理(如果我尝试中止,则不会发生任何情况…)
谁知道我可能做错了什么?我在Windows上使用Spyder(Python 3.7)
谢谢
目前没有回答
相关问题 更多 >
编程相关推荐