我最初的问题是关于Python下的并行性。然而,由于这个问题一直没有答案,我删除了它,并试图总结我的结论。希望它能帮助某人。。。在
一般来说,有两种主要的方法使代码并行运行-使用多线程或多处理库。在
根据上的许多帖子stackoverflow.com网站多线程库能够跨线程有效地共享内存,但在单个内核上运行线程。因此,如果瓶颈是I/O操作,它可以提高代码的速度。我不确定是否有许多图书馆的实际应用程序。。。在
你的CPU问题有时会被称为多处理器问题。库将线程分散到各个核心上。然而,许多人(包括我)观察到,这样的多核代码比单核代码要慢得多。这个问题被认为是由于单个线程不能有效地共享内存-数据被大量复制,这会造成相当大的开销。正如我下面的代码所示,开销很大程度上取决于输入数据类型。这个问题在Windows上比在Linux上要严重得多。我不得不说,并行性是我对Python最大的失望——显然Python的设计并没有考虑到并行性。。。在
第一段代码使用Process
在核心之间分配{
import numpy as np
import math as mth
import pandas as pd
import time as tm
import multiprocessing as mp
def bnd_calc_npv_dummy(bnds_info, core_idx, npv):
""" multiple core dummy valuation function (based on single core function) """
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
npv[core_idx] = np.array(bnds_info['npv'])
def split_bnds_info(bnds_info, cores_no):
""" cut dataframe with bond definitions into pieces - one piece per core """
bnds_info_mp = []
bnds_no = len(bnds_info)
batch_size = mth.ceil(np.float64(bnds_no) / cores_no) # number of bonds allocated to one core
# split dataframe among cores
for idx in range(cores_no):
lower_bound = int(idx * batch_size)
upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no]))
bnds_info_mp.append(bnds_info[lower_bound : upper_bound].reset_index().copy())
# return list of dataframes
return bnds_info_mp
def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """
manager = mp.Manager()
npv = manager.dict()
bnds_info_mp = split_bnds_info(bnds_info, cores_no)
processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]
[process.start() for process in processes]
[process.join() for process in processes]
# return NPV of individual bonds
return np.hstack(npv.values())
if __name__ == '__main__':
# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = {'currency_name' : 'EUR', 'npv' : 100}
bnds_info = pd.DataFrame(bnds_info, index = range(1))
bnds_info = pd.concat([bnds_info] * bnds_no, ignore_index = True)
# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
npv = np.array(bnds_info['npv'])
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
第二个代码与前面的代码相同-唯一的区别是这次我们使用numpy array
而不是{
最后一段代码使用Pool
,而不是Process
。运行时稍微好一点。在
import numpy as np
import time as tm
import multiprocessing as mp
#import pdb
#pdb.set_trace()
def bnd_calc_npv_dummy(bnds_info):
""" multiple core dummy valuation function (based on single core function) """
try:
# get number of bonds
bnds_no = len(bnds_info)
except:
pass
bnds_no = 1
tm.sleep(0.0001 * bnds_no)
return bnds_info
def bnd_calc_npv(bnds_info, cores_no):
""" dummy valuation function running multicore """
pool = mp.Pool(processes = cores_no)
npv = pool.map(bnd_calc_npv_dummy, bnds_info.tolist())
# return NPV of individual bonds
return npv
if __name__ == '__main__':
# create dummy dataframe
bnds_no = 1200 # number of dummy in the sample
bnds_info = np.array([100.0] * bnds_no)
# one core
print("ONE CORE")
start_time = tm.time()
bnds_no = len(bnds_info)
tm.sleep(0.0001 * bnds_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# two cores
print("TWO CORES")
cores_no = 2
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# three cores
print("THREE CORES")
cores_no = 3
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
# four cores
print("FOUR CORES")
cores_no = 4
start_time = tm.time()
npv = bnd_calc_npv(bnds_info, cores_no)
elapsed_time = (tm.time() - start_time)
print(' elapsed time: ' + str(elapsed_time) + 's')
因此,我的结论是Python实现的并行性并不适用于现实生活(我使用了python2.7.13和windows7)。 谨致问候
麦基
PS:如果有人能够更改代码,我将非常高兴地改变我的想法。。。在
当问题的一部分可以独立计算时,多处理的效果最好,例如使用
multiprocessing.Pool
。 池中的每个工作进程处理部分输入并将结果返回给主进程。在如果所有进程都需要修改整个输入数组中的数据,那么
manager
的同步开销很可能会破坏多处理带来的任何收益。在相关问题 更多 >
编程相关推荐