python struct.error:“i”格式要求-2147483648<=数字<=2147483647

2024-06-26 14:09:32 发布

您现在位置:Python中文网/ 问答频道 /正文

问题

我愿意使用多处理模块(multiprocessing.Pool.starmap()进行功能工程。 但是,它会给出如下错误消息。我猜这个错误信息是关于输入的大小(2147483647=2^31-1?),因为相同的代码对于输入数据帧的一小部分(frac=0.05)(train_scala,test,ts)工作顺利。我将数据帧的类型转换为尽可能小的类型,但是效果并没有变好。

anaconda版本是4.3.30,Python版本是3.6(64位)。 系统内存容量128GB以上,20核以上。 你想提出解决这个问题的建议吗?如果这个问题是由一个多处理模块的大数据引起的,那么我应该使用多少较小的数据来利用Python3上的多处理模块?

代码:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)

错误消息:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

额外信息

  • HistoryCutoff是一个整数列表
  • scala列车是熊猫数据帧(377MB)
  • 测试是熊猫数据帧(15MB)
  • ts是熊猫数据帧(547MB)
  • ul_parts_path是目录列表(字符串)
  • “火车”是一个布尔人的名单

额外代码:方法多处理机

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
    train_dict = {}
    ts_dict = {}
    msno_dict = {}
    ul_dict = {}
    if is_train == True:
        train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
    else:
        train_dict[historyCutoff] = test
    msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
    print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
    ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
    print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
    ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
    ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
    train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)

Tags: 数据inpyislinetrainulmultiprocessing
2条回答

这个问题在最近的python公关中得到了解决 https://github.com/python/cpython/pull/10305

如果需要,可以在本地进行此更改,使其立即为您工作,而无需等待python和anaconda的发布。

进程之间的通信协议使用pickling,pickled数据的前缀是pickled数据的大小。对于您的方法,所有参数一起作为一个对象进行pickle。

您生成了一个对象,当pickled大于i结构格式化程序(一个四字节有符号整数)的大小时,该对象将打破代码所做的假设。

您可以将数据帧的读取委托给子进程,而只跨加载数据帧所需的元数据发送。它们的总大小接近1GB,太多数据无法在进程之间通过管道共享。

引用Programming guidelines section

Better to inherit than pickle/unpickle

When using the spawn or forkserver start methods many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

如果不在Windows上运行并使用spawnforkserver方法,则可以在启动子进程之前将数据帧加载为globals,此时子进程将通过正常的OS copy on write memory页面共享机制“继承”数据。

相关问题 更多 >