与multiprocessing.p共享大数据帧

2024-10-01 17:23:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个函数,我想用多重处理并行计算。该函数接受一个参数,但也从两个非常大的数据帧加载子集,这两个数据帧已经加载到内存中(其中一个约为1G,另一个略高于6G)。你知道吗

largeDF1 = pd.read_csv(directory + 'name1.csv')
largeDF2 = pd.read_csv(directory + 'name2.csv')

def f(x):
    load_content1 = largeDF1.loc[largeDF1['FirstRow'] == x]
    load_content2 = largeDF1.loc[largeDF1['FirstRow'] == x]
    #some computation happens here
    new_data.to_csv(directory + 'output.csv', index = False)

def main():
    multiprocessing.set_start_method('spawn', force = True)
    pool = multiprocessing.Pool(processes = multiprocessing.cpu_count())
    input = input_data['col']
    pool.map_async(f, input)
    pool.close()
    pool.join()

问题是这些文件太大了,当我在多个内核上运行它们时,会出现内存问题。我想知道是否有一种方法可以在所有进程之间共享加载的文件。你知道吗

我试过manager(),但没能成功。感谢您的帮助。谢谢。你知道吗


Tags: csv数据函数内存readinputdefload
1条回答
网友
1楼 · 发布于 2024-10-01 17:23:38

如果您在类UNIX的系统上运行这个程序(默认情况下使用forkstartmethod),那么数据将在开箱即用的情况下共享。大多数操作系统使用“写时拷贝”作为内存页。因此,即使您多次fork一个进程,只要您不修改那些数据帧,它们也会共享包含这些数据帧的大部分内存页。你知道吗

但是当使用spawnstart方法时,每个工作进程都必须加载数据帧。我不确定操作系统是否足够聪明,可以共享内存页。或者实际上,这些派生的进程都具有相同的内存布局。你知道吗

我能想到的唯一可移植的解决方案是将数据留在磁盘上,并使用workers中的mmap将其映射到只读的内存中。这样操作系统就会注意到多个进程映射同一个文件,并且只加载一个副本。你知道吗

缺点是数据将以磁盘上的csv格式存储在内存中,这样就可以从中读取数据(而无需复制!)不太方便。因此,您可能希望事先将数据准备成更易于使用的形式。例如,将数据从'FirstRow'转换成floatdouble的二进制文件,您可以用struct.iter_unpack进行迭代。你知道吗

下面的函数(来自我的statusline脚本)使用mmap统计邮箱文件中的邮件量。你知道吗

def mail(storage, mboxname):
    """
    Report unread mail.
    Arguments:
        storage: a dict with keys (unread, time, size) from the previous call or an empty dict.
            This dict will be *modified* by this function.
        mboxname (str): name of the mailbox to read.
    Returns: A string to display.
    """
    stats = os.stat(mboxname)
    if stats.st_size == 0:
        return 'Mail: 0'
    # When mutt modifies the mailbox, it seems to only change the
    # ctime, not the mtime! This is probably releated to how mutt saves the
    # file. See also stat(2).
    newtime = stats.st_ctime
    newsize = stats.st_size
    if not storage or newtime > storage['time'] or newsize != storage['size']:
        with open(mboxname) as mbox:
            with mmap.mmap(mbox.fileno(), 0, prot=mmap.PROT_READ) as mm:
                start, total = 0, 1  # First mail is not found; it starts on first line...
                while True:
                    rv = mm.find(b'\n\nFrom ', start)
                    if rv == -1:
                        break
                    else:
                        total += 1
                        start = rv + 7
                start, read = 0, 0
                while True:
                    rv = mm.find(b'\nStatus: R', start)
                    if rv == -1:
                        break
                    else:
                        read += 1
                        start = rv + 10
        unread = total - read
        # Save values for the next run.
        storage['unread'], storage['time'], storage['size'] = unread, newtime, newsize
    else:
        unread = storage['unread']
    return f'Mail: {unread}'

在本例中,我使用mmap,因为它比仅仅读取文件快4倍。参见normal readingusing mmap。你知道吗

相关问题 更多 >

    热门问题