有没有一个好方法可以避免内存深度复制或减少在多处理上花费的时间?

2024-10-01 09:40:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用Python环境的Pandas模块制作一个基于内存的“大数据”实时计算模块。在

所以响应时间是这个模块的质量,非常关键和重要。在

为了处理大数据集,我并行地拆分数据和处理子拆分数据。在

在存储子数据的结果部分,花费了大量的时间(第21行)。在

我认为内部内存深度复制出现或者传递的子数据没有在内存中共享。在

<>如果我用C或C++编写模块,我会使用下面的指针或引用。在

“进程=进程(target=addNewDerivedColumn,args=[resultList,&sub\u dataframe])”

或者

“进程=进程(target=addNewDerivedColumn,args=[resultList,sub\u dataframe])

def addNewDerivedColumn(结果列表,拆分\u sub\u dataframe&;):。。。。 ““

有没有一个好方法可以避免内存深度复制或减少在多处理上花费的时间? “不雅”也不错。 我准备把我的代码弄脏。 我试过weekref,RawValue,RawArray,Value,Pool,但都失败了。在

该模块正在MacOS中开发,最后将在Linux或Unix上运行。在

不要考虑Windows操作系统。在

密码来了。在

真正的代码在我的办公室里,但是结构和逻辑和真实的一样。在

1 #-*- coding: UTF-8 -*-' 
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9    
10    split_sub_dataframe['new_column']=    np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11    
12    print split_sub_dataframe.head()
13    
14    '''
15     i think that the hole result of sub-dataframe is copied to resultList, not reference value 
16     and in here time spend much
17     compare elapsed time of comment 21th line with the uncommented one
18     In MS Windows, signifiant difference of elapsed time doesn't show up
19     In Linux or Mac OS, the difference is big
20    '''
21    resultList.append(split_sub_dataframe)
22    
23
24
25 if __name__ == "__main__":
26    
27    # example data generation
28    # the record count of the real data is over 1 billion with about 10 columns.
29    dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30    
31
32    print 'start...'
33    start_time = time.time()
34    
35    # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36    split_dataframe_list = np.array_split(dataframe, 5)
37    
38    # multiprocessing 
39    manager = Manager()
40    
41    # result list
42    resultList=manager.list()
43    processList=[]
44    
45    for sub_dataframe in split_dataframe_list:
46        process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47        processList.append(process)
48        
49    for proc in processList: 
50        proc.start()
51    for proc in processList: 
52        proc.join()
53    
54    
55    print 'elapsed time  : ', np.round(time.time() - start_time,3)

Tags: 模块ofthe数据内存inimportdataframe
2条回答

如果保持进程间通信到 最小值。因此,与其将子数据帧作为参数传递,不如传递 索引值。子进程可以分割公共数据帧本身。在

生成子进程时,它将获得在 调用父进程的模块。因此,如果大数据帧df是 在globals中定义之前生成一个多处理池,然后每个 派生的子进程将有权访问df。在

在没有fork()的Windows上,将启动一个新的python进程,并且 调用模块已导入。因此,在Windows上,派生的子进程必须 从头开始重新生成df,这可能需要时间和大量额外的内存。在

但是,在Linux上,您有“写时拷贝”功能。这意味着 子进程访问(调用模块的)原始全局变量,而不访问 复制他们。只有当子进程试图修改全局时,Linux才会这样做 然后在修改该值之前创建一个单独的副本。在

因此,如果您避免在 子流程。我建议只使用子进程进行计算。返回 值的计算,并让主进程整理结果进行修改 原始数据帧。在

import pandas as pd
import numpy as np
import multiprocessing as mp
import time

def compute(start, end):
    sub = df.iloc[start:end]
    return start, end, np.abs(sub['column_01']+sub['column_01']) / 2

def collate(retval):
    start, end, arr = retval
    df.ix[start:end, 'new_column'] = arr

def window(seq, n=2):
    """
    Returns a sliding window (of width n) over data from the sequence
    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    for i in range(len(seq)-n+1):
        yield tuple(seq[i:i+n])

if __name__ == "__main__":
    result = []
    # the record count of the real data is over 1 billion with about 10 columns.
    N = 10**3
    df = pd.DataFrame(np.random.randn(N, 4),
                      columns=['column_01', 'column_02', 'column_03', 'column_04'])

    pool = mp.Pool()    
    df['new_column'] = np.empty(N, dtype='float')

    start_time = time.time()
    idx = np.linspace(0, N, 5+1).astype('int')
    for start, end in window(idx, 2):
        # print(start, end)
        pool.apply_async(compute, args=[start, end], callback=collate)

    pool.close()
    pool.join()
    print 'elapsed time  : ', np.round(time.time() - start_time,3)
    print(df.head())

受到这个问题和@unutbu的答案的启发,我在github上写了一个map的并行版本。该函数适用于多核单机无限并行处理只读大数据结构。基本思想类似于@unutbu suggested,使用临时全局变量保存大数据结构(例如,数据帧),并将其“名称”而不是变量本身传递给工人。但所有这些都封装在一个map函数中,因此它几乎是在pathos包的帮助下对标准map函数的一个简单替换。示例用法如下:

# Suppose we process a big dataframe with millions of rows.
size = 10**9
df = pd.DataFrame(np.random.randn(size, 4),
                  columns=['column_01', 'column_02', 
                           'column_03', 'column_04'])
# divide df into sections of 10000 rows; each section will be
# processed by one worker at a time
section_size = 10000
sections = [xrange(start, start+section_size) 
            for start in xrange(0, size, section_size)]

# The worker function that processes one section of the
# df. The key assumption is that a child 
# process does NOT modify the dataframe, but do some 
# analysis or aggregation and return some result.
def func(section, df):
    return some_processing(df.iloc[section])

num_cores = 4
# sections (local_args) specify which parts of a big object to be processed;
# global_arg holds the big object to be processed to avoid unnecessary copy;
# results are a list of objects each of which is the processing results 
# of one part of a big object (i.e., one element in the iterable sections) 
# in order.
results = map(func, sections, global_arg=df,
              chunksize=10, 
              processes=num_cores)

# reduce results (assume it is a list of data frames)
result = pd.concat(results)

在我的一些文本挖掘任务中,直接将df传递给worker函数的朴素并行实现甚至比单线程版本慢,这是因为大数据帧的复制操作非常昂贵。但是,对于那些4核的任务,上面的实现可以给那些有3倍以上的加速,这看起来非常接近真正的轻量级多线程。在

相关问题 更多 >