多重处理中的类似错误。函数的参数数目不匹配

2024-09-22 20:28:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我找不到更好的方法来描述我所面临的错误,但这个错误似乎每次我试图实现循环调用的多处理时都会出现。在

我两者都用过sklearn.externals.joblib以及多处理过程但是错误是相似的,虽然不同。在

要应用多处理的原始循环,其中一个迭代在单个线程/进程中执行

for dd in final_col_dates:
    idx1 = final_col_dates.tolist().index(dd)

    dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
    data2 = dataObj.fit()

    dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
    data3 = dataObj.fit()

    if idx1 > 0:
       data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan

    iter += 1

    days_out_vars.append(data3)

为了将上面的代码snipet实现为多处理,我创建了一个方法,除了For循环,上面的代码都在这里。在

使用Joblib,下面是我的代码片段。在

^{pr2}$

变量return_list是共享变量,在ParallelLoopTest方法中执行。声明如下:

manager = Manager()
return_list = manager.list()

使用上面的代码片段,我面临以下错误:

Process SpawnPoolWorker-3:
Traceback (most recent call last):
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 249, in _bootstrap
  self.run()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 93, in run
  self._target(*self._args, **self._kwargs)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
  task = get()
File "C:\Users\dkanhar\Anaconda3\lib\site-packages\sklearn\externals\joblib\pool.py", line 359, in get
  return recv()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\connection.py", line 251, in recv
  return ForkingPickler.loads(buf.getbuffer())
TypeError: function takes at most 0 arguments (1 given)

我也尝试了多处理模块来执行上面提到的代码,但是仍然面临类似的错误。使用多处理模块运行以下代码:

for dd in final_col_dates:
    # multiprocessing.Pipe(False)
    p = multiprocessing.Process(target=self.ParallelLoopTest, args=(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list))
    jobs.append(p)
    p.start()

for proc in jobs:
    proc.join()

而且,我面临着以下的错误回溯:

File "<string>", line 1, in <module>
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 106, in spawn_main
   exitcode = _main(fd)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 116, in _main
   self = pickle.load(from_parent)
TypeError: function takes at most 0 arguments (1 given)
Traceback (most recent call last):
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 457, in <module>
   print(obj.fit())
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 39, in fit
return self.__driver__()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 52, in __driver__
   final = self.process_()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 135, in process_
   sch_dat = self.inline_apply_(all_dates_schd, d1, d2, a)
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 297, in inline_apply_
   p.start()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 105, in start
   self._popen = self._Popen(self)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
   return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
   return Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
   reduction.dump(process_obj, to_child)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
   ForkingPickler(file, protocol).dump(obj)
   BrokenPipeError: [Errno 32] Broken pipe

所以,我试着取消了这行的注释多处理.管道(错误)认为这可能是因为使用了管道,我禁用了管道,但问题仍然存在,我面临同样的错误。在

如果有任何帮助,以下是我的方法ParallerLoopTest:

def ParallelLoopTest(self, dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars):
    idx1 = final_col_dates.tolist().index(dd)

    dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
    data2 = dataObj.fit()

    dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
    data3 = dataObj.fit()

    if idx1 > 0:
        data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan

    print("Iter ", iter)
    iter += 1

    days_out_vars.append(data3)

我之所以说相似的错误是因为如果你看一下这两个错误的回溯,它们之间都有相似的错误线:

TypeError:函数最多接受0个参数(给定1个)从Pickle加载时,我不知道为什么会发生这种情况。在

另外请注意,我之前已经在其他项目中成功地实现了这两个模块,但从未遇到过问题,所以我不知道为什么现在开始出现这个问题,以及这个问题的确切含义。在

任何帮助都将非常感谢,因为我已经浪费时间调试这三天了。在

谢谢

在最后一个答案后编辑1

回答之后,我试了下面这个。 添加了decorator@staticmethod,删除了self,并使用DataPrep.ParallelLoopTest(args)。在

另外,将方法移出类DataPrep,并简单地由ParallelLoopTest(args)调用

但在这两种情况下,误差都是一样的。在

附言:我试着在这两种情况下使用joblib。 所以,两种解决方案都不起作用。在

新方法定义:

def ParallelLoopTest(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars, DC, start_hour):
    idx1 = final_col_dates.tolist().index(dd)

    dataObj = GetPrevDataByDate(d1, a, dd, start_hour_of_day)
    data2 = dataObj.fit()

    dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, DC, frgt_typ_filter)
    data3 = dataObj.fit()

    if idx1 > 0:
        data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan

    print("Iter ", iter)
    iter += 1

    days_out_vars.append(data3)

编辑2:

我遇到了一个错误,因为Python无法提取一些大的数据帧。我的参数/参数中有两个数据帧,一个是20MB左右,另一个是200 MB的pickle格式。但这不应该是个问题,对吧?我们应该能够传递熊猫数据帧。如果我错了,请纠正我。在

另外,解决方法是在方法调用之前用随机名称将数据帧保存为csv,传递文件名并读取csv,但这是一个缓慢的过程,因为它涉及到大量csv文件的推理。有什么建议吗?在


Tags: inpyself错误linecolmultiprocessingusers
1条回答
网友
1楼 · 发布于 2024-09-22 20:28:21

实际上,在这两种情况下都会出现完全相同的错误,但在一个示例(joblib)中使用Process时,在主线程中不会得到相同的失败/回溯,因为它们没有以相同的方式管理进程失败。
在这两种情况下,您的进程似乎都无法取消对新的Process中的子作业的拾取。Pool返回取消拾取错误,而使用Process,则会出现错误,因为当子进程因该取消拾取错误而死亡时,它会关闭主线程用于写入数据的管道,从而导致主进程中出现错误。在

我的第一个想法是,错误是由于您尝试pickle一个实例方法而导致的,而您应该在这里尝试使用静态方法(使用实例方法似乎不正确,因为对象没有在进程之间共享)。
在声明ParallelLoopTest之前使用装饰符@staticmethod,并删除{}参数。在

编辑: 另一种可能是其中一个参数dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list无法取消拾取。显然,它来自panda.DataFrame
我看不出在这种情况下取消拾取失败的任何原因,但我不太清楚panda
一种解决方法是将数据转储到临时文件中。您可以查看此链接here,以获得panda.DataFrame的有效序列化。另一个解决方案是使用DataFrame.to_pickle方法和panda.read_pickle将其转储/检索到文件中。在

请注意,最好将joblib.Parallel与{}进行比较,而不是与{}进行比较。在

相关问题 更多 >