我找不到更好的方法来描述我所面临的错误,但这个错误似乎每次我试图实现循环调用的多处理时都会出现。在
我两者都用过sklearn.externals.joblib以及多处理过程但是错误是相似的,虽然不同。在
要应用多处理的原始循环,其中一个迭代在单个线程/进程中执行
for dd in final_col_dates:
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
iter += 1
days_out_vars.append(data3)
为了将上面的代码snipet实现为多处理,我创建了一个方法,除了For循环,上面的代码都在这里。在
使用Joblib,下面是我的代码片段。在
^{pr2}$变量return_list是共享变量,在ParallelLoopTest方法中执行。声明如下:
manager = Manager()
return_list = manager.list()
使用上面的代码片段,我面临以下错误:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\dkanhar\Anaconda3\lib\site-packages\sklearn\externals\joblib\pool.py", line 359, in get
return recv()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\connection.py", line 251, in recv
return ForkingPickler.loads(buf.getbuffer())
TypeError: function takes at most 0 arguments (1 given)
我也尝试了多处理模块来执行上面提到的代码,但是仍然面临类似的错误。使用多处理模块运行以下代码:
for dd in final_col_dates:
# multiprocessing.Pipe(False)
p = multiprocessing.Process(target=self.ParallelLoopTest, args=(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
而且,我面临着以下的错误回溯:
File "<string>", line 1, in <module>
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 106, in spawn_main
exitcode = _main(fd)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 116, in _main
self = pickle.load(from_parent)
TypeError: function takes at most 0 arguments (1 given)
Traceback (most recent call last):
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 457, in <module>
print(obj.fit())
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 39, in fit
return self.__driver__()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 52, in __driver__
final = self.process_()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 135, in process_
sch_dat = self.inline_apply_(all_dates_schd, d1, d2, a)
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 297, in inline_apply_
p.start()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe
所以,我试着取消了这行的注释多处理.管道(错误)认为这可能是因为使用了管道,我禁用了管道,但问题仍然存在,我面临同样的错误。在
如果有任何帮助,以下是我的方法ParallerLoopTest:
def ParallelLoopTest(self, dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars):
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
print("Iter ", iter)
iter += 1
days_out_vars.append(data3)
我之所以说相似的错误是因为如果你看一下这两个错误的回溯,它们之间都有相似的错误线:
TypeError:函数最多接受0个参数(给定1个)从Pickle加载时,我不知道为什么会发生这种情况。在
另外请注意,我之前已经在其他项目中成功地实现了这两个模块,但从未遇到过问题,所以我不知道为什么现在开始出现这个问题,以及这个问题的确切含义。在
任何帮助都将非常感谢,因为我已经浪费时间调试这三天了。在
谢谢
在最后一个答案后编辑1
回答之后,我试了下面这个。 添加了decorator@staticmethod,删除了self,并使用DataPrep.ParallelLoopTest(args)。在
另外,将方法移出类DataPrep,并简单地由ParallelLoopTest(args)调用
但在这两种情况下,误差都是一样的。在
附言:我试着在这两种情况下使用joblib。 所以,两种解决方案都不起作用。在
新方法定义:
def ParallelLoopTest(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars, DC, start_hour):
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
print("Iter ", iter)
iter += 1
days_out_vars.append(data3)
编辑2:
我遇到了一个错误,因为Python无法提取一些大的数据帧。我的参数/参数中有两个数据帧,一个是20MB左右,另一个是200 MB的pickle格式。但这不应该是个问题,对吧?我们应该能够传递熊猫数据帧。如果我错了,请纠正我。在
另外,解决方法是在方法调用之前用随机名称将数据帧保存为csv,传递文件名并读取csv,但这是一个缓慢的过程,因为它涉及到大量csv文件的推理。有什么建议吗?在
实际上,在这两种情况下都会出现完全相同的错误,但在一个示例(
joblib
)中使用Process
时,在主线程中不会得到相同的失败/回溯,因为它们没有以相同的方式管理进程失败。在这两种情况下,您的进程似乎都无法取消对新的
Process
中的子作业的拾取。Pool
返回取消拾取错误,而使用Process
,则会出现错误,因为当子进程因该取消拾取错误而死亡时,它会关闭主线程用于写入数据的管道,从而导致主进程中出现错误。在我的第一个想法是,错误是由于您尝试pickle一个实例方法而导致的,而您应该在这里尝试使用静态方法(使用实例方法似乎不正确,因为对象没有在进程之间共享)。}参数。在
在声明
ParallelLoopTest
之前使用装饰符@staticmethod
,并删除{编辑: 另一种可能是其中一个参数
dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list
无法取消拾取。显然,它来自panda.DataFrame
。我看不出在这种情况下取消拾取失败的任何原因,但我不太清楚
panda
。一种解决方法是将数据转储到临时文件中。您可以查看此链接here,以获得
panda.DataFrame
的有效序列化。另一个解决方案是使用DataFrame.to_pickle
方法和panda.read_pickle
将其转储/检索到文件中。在请注意,最好将}进行比较,而不是与{}进行比较。在
joblib.Parallel
与{相关问题 更多 >
编程相关推荐