回答此问题可获得 20 贡献值,回答如果被采纳可获得 50 分。
<h3>问题</h3>
<p>我愿意使用多处理模块<code>(multiprocessing.Pool.starmap()</code>进行功能工程。
但是,它会给出如下错误消息。我猜这个错误信息是关于输入的大小(2147483647=2^31-1?),因为相同的代码对于输入数据帧的一小部分<code>(frac=0.05)</code>(train_scala,test,ts)工作顺利。我将数据帧的类型转换为尽可能小的类型,但是效果并没有变好。</p>
<p>anaconda版本是4.3.30,Python版本是3.6(64位)。
系统内存容量128GB以上,20核以上。
你想提出解决这个问题的建议吗?如果这个问题是由一个多处理模块的大数据引起的,那么我应该使用多少较小的数据来利用Python3上的多处理模块?</p>
<p><strong>代码:</strong></p>
<pre><code>from multiprocessing import Pool, cpu_count
from itertools import repeat
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)
</code></pre>
<p><strong>错误消息:</strong></p>
<pre><code>Traceback (most recent call last):
File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
print('----Pool starmap start----')
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
put(task)
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
</code></pre>
<h3>额外信息</h3>
<ul>
<li>HistoryCutoff是一个整数列表</li>
<li>scala列车是熊猫数据帧(377MB)</li>
<li>测试是熊猫数据帧(15MB)</li>
<li>ts是熊猫数据帧(547MB)</li>
<li>ul_parts_path是目录列表(字符串)</li>
<li>“火车”是一个布尔人的名单</li>
</ul>
<p><strong>额外代码:方法多处理机</strong></p>
<pre><code>def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
train_dict = {}
ts_dict = {}
msno_dict = {}
ul_dict = {}
if is_train == True:
train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
else:
train_dict[historyCutoff] = test
msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))
ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt")) ##.sample(frac=0.01, replace=False)
ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)
</code></pre>