使用daskhdf/parqu的Python大数据集特征工程工作流

2024-05-18 19:14:13 发布

您现在位置:Python中文网/ 问答频道 /正文

SO中已经有了一个很好的问题,但是最好的答案现在已经有5年了,所以我认为2018年应该会有更好的选择。在

我目前正在寻找一个大于内存的数据集的特性工程管道(使用合适的数据类型)。在

初始文件是一个csv,它不适合内存。以下是我的需求:

  1. 创建特性(主要是在多个列上使用groupby操作)
  2. 将新功能合并到以前的数据(在磁盘上,因为它不适合内存)
  3. 对一些ML应用程序使用子集(或全部)列/索引
  4. 重复1/2/3(这是一个迭代过程,类似于day1:create4 功能,第2天:再创建4个…)

尝试使用拼花地板和护墙板:

首先,我将大的csv文件分成多个小的“parquet”文件。这样,dask对于计算新特性非常有效,但是,我需要将它们合并到初始数据集和atm中,我们不能向parquet文件添加新列。通过块读取csv,合并并重新保存到多个parquet文件是非常耗时的,因为功能工程在这个项目中是一个迭代过程。在

尝试使用HDF和dask:

然后我转向HDF,因为我们可以添加列,也可以使用特殊查询,它仍然是二进制文件存储。为了使用与DASK的并发写入(HDF不允许),我再次使用相同的key='base'将大csv文件拆分为多个HDF。在

data = data.repartition(npartitions=10) # otherwise it was saving 8Mo files using to_hdf
data.to_hdf('./hdf/data-*.hdf', key='base', format='table', data_columns=['day'], get=dask.threaded.get)

附录问题:指定数据列对于dask似乎没有用处,因为在中没有“where”读什么?

与我预期的不同,我无法使用以下代码将新功能合并到多个小文件中:

^{pr2}$

与螺纹接头我得到“python停止工作”在2%之后。 与dask.multiprocessing.get.获取创建新文件需要花费很多时间

最适合此工作流的工具(存储和处理)是什么?在


Tags: 文件csv数据key内存功能dataget
2条回答

我会认真考虑使用数据库(索引访问)作为存储,甚至使用apachespark(以分布式/集群方式处理数据)和Hive/Impala作为后端。。。在

我将只复制一个关于fastparquet的related issue的注释:从技术上讲,可以向现有的parquet数据集添加列,但这并没有在fastparquet中实现,也可能不会在任何其他parquet实现中实现。在

编写这样的代码可能不会太麻烦(但目前还没有计划):对write columns的调用是按顺序进行的,因此需要向下渗透到该函数中的新列,以及与页脚中元数据的当前第一个字节相对应的文件位置。另外,模式需要单独更新(这很简单)。对于数据集的每个文件,都需要重复该过程。这不是问题的“答案”,但也许有人想承担这项任务。在

相关问题 更多 >

    热门问题