<p>如果保持进程间通信到
最小值。因此,与其将子数据帧作为参数传递,不如传递
索引值。子进程可以分割公共数据帧本身。在</p>
<p>生成子进程时,它将获得在
调用父进程的模块。因此,如果大数据帧<code>df</code>是
在globals中定义<em>之前</em>生成一个多处理池,然后每个
派生的子进程将有权访问<code>df</code>。在</p>
<p>在没有<code>fork()</code>的Windows上,将启动一个新的python进程,并且
调用模块已导入。因此,在Windows上,派生的子进程必须
从头开始重新生成<code>df</code>,这可能需要时间和大量额外的内存。在</p>
<p>但是,在Linux上,您有“写时拷贝”功能。这意味着
子进程访问(调用模块的)原始全局变量,而不访问
复制他们。只有当子进程试图修改全局时,Linux才会这样做
然后在修改该值之前创建一个单独的副本。在</p>
<p>因此,如果您避免在
子流程。我建议只使用子进程进行计算。返回
值的计算,并让主进程整理结果进行修改
原始数据帧。在</p>
<pre><code>import pandas as pd
import numpy as np
import multiprocessing as mp
import time
def compute(start, end):
sub = df.iloc[start:end]
return start, end, np.abs(sub['column_01']+sub['column_01']) / 2
def collate(retval):
start, end, arr = retval
df.ix[start:end, 'new_column'] = arr
def window(seq, n=2):
"""
Returns a sliding window (of width n) over data from the sequence
s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
"""
for i in range(len(seq)-n+1):
yield tuple(seq[i:i+n])
if __name__ == "__main__":
result = []
# the record count of the real data is over 1 billion with about 10 columns.
N = 10**3
df = pd.DataFrame(np.random.randn(N, 4),
columns=['column_01', 'column_02', 'column_03', 'column_04'])
pool = mp.Pool()
df['new_column'] = np.empty(N, dtype='float')
start_time = time.time()
idx = np.linspace(0, N, 5+1).astype('int')
for start, end in window(idx, 2):
# print(start, end)
pool.apply_async(compute, args=[start, end], callback=collate)
pool.close()
pool.join()
print 'elapsed time : ', np.round(time.time() - start_time,3)
print(df.head())
</code></pre>