我需要加快处理任务的时间,这些任务涉及相当大的数据集,这些数据集是从高达1.5GB的带有CSV数据的大型pickle文件加载的。我从python的多处理开始,但对于不可pickle的类对象,我必须切换到pathos。我运行了一些并行代码,并复制了从通常的串行运行中获得的结果:到目前为止还不错。但是,当中央主节点全速运行,而实际的子进程(有时总共数百个)串行运行,而不是并行运行时,处理速度远远没有任何用处,因为主节点再次疯狂运行时,巨大的时间间隔将其隔开,这是为了什么?它“最好”使用ProcessPool,而apipe和amap是一样的,没有区别
下面是我的代码摘录,首先是并行部分,其次是串行部分。两者都给出相同的结果,但并行方法要慢得多。重要的是,每个并行子进程使用的时间与串行循环中的时间大致相同。所有变量都在长处理管道中提前加载
#### PARALLEL multiprocessing
if me.MP:
import pathos
cpuN = pathos.multiprocessing.cpu_count() - 1
pool = pathos.pools.ProcessPool( cpuN) # ThreadPool ParallelPool
argsIn1 = [] # a mid-large complex dictionary
argsIn2 = [] # a very large complex dictionary (CSV pickle of 400MB or more)
argsIn3 = [] # a list of strings
argsIn4 = [] # a brief string
for Uscr in UID_lst:
argsIn1.append( me)
argsIn2.append( Db)
argsIn3.append( UID_lst)
argsIn4.append( Uscr)
result_pool = pool.amap( functionX, argsIn1, argsIn2, argsIn3, argsIn4)
results = result_pool.get()
for result in results:
[ meTU, U] = result
me.v[T][U] = meTU[U] # insert result !
#### SERIAL processing
else:
for Uscr in UID_lst:
meTU, U = functionX( me, Db, UID_lst, Uscr)
me.v[T][U] = meTU[U] # insert result !
我在两台linux机器上测试了这段代码,一台i3CPU(32GB内存,slackware 14.2,python 3.7)和一台2*Xeon机器(64GB内存,slackware current,python 3.8)。pathos 0.2.6与pip3一起安装。如前所述,两台机器都显示了相同的速度问题,代码如下所示
我错过了什么
附录:似乎只有第一个PID在通过UID_lst中的所有项执行整个作业,而其他10个子进程处于空闲状态,什么都不等待,如top和os.getpid()所示。在本例中,cpuN为11
附录2:很抱歉这个新版本,但是在不同的负载下运行这个代码(需要解决的任务更多)最终会涉及到不止一个子进程的忙碌,但是仅仅经过了很长的时间!这里有一个顶部输出:
top - 14:09:28 up 19 days, 4:04, 3 users, load average: 6.75, 6.20, 5.08
Tasks: 243 total, 6 running, 236 sleeping, 0 stopped, 1 zombie
%Cpu(s): 48.8 us, 1.2 sy, 0.0 ni, 49.9 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
MiB Mem : 64061.6 total, 2873.6 free, 33490.9 used, 27697.1 buff/cache
MiB Swap: 0.0 total, 0.0 free, 0.0 used. 29752.0 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
5441 userx 20 0 6597672 4.9g 63372 S 100.3 7.9 40:29.08 python3 do_Db_job
5475 userx 20 0 6252176 4.7g 8828 R 100.0 7.5 9:24.46 python3 do_Db_job
5473 userx 20 0 6260616 4.7g 8828 R 100.0 7.6 17:02.44 python3 do_Db_job
5476 userx 20 0 6252432 4.7g 8828 R 100.0 7.5 5:37.52 python3 do_Db_job
5477 userx 20 0 6252432 4.7g 8812 R 100.0 7.5 1:48.18 python3 do_Db_job
5474 userx 20 0 6253008 4.7g 8828 R 99.7 7.5 13:13.38 python3 do_Db_job
1353 userx 20 0 9412 4128 3376 S 0.0 0.0 0:59.63 sshd: userx@pts/0
1354 userx 20 0 7960 4692 3360 S 0.0 0.0 0:00.20 -bash
1369 userx 20 0 9780 4212 3040 S 0.0 0.0 31:16.80 sshd: userx@pts/1
1370 userx 20 0 7940 4632 3324 S 0.0 0.0 0:00.16 -bash
4545 userx 20 0 5016 3364 2296 R 0.0 0.0 3:01.76 top
5437 userx 20 0 19920 13280 6544 S 0.0 0.0 0:00.07 python3
5467 userx 20 0 0 0 0 Z 0.0 0.0 0:00.00 [git] <defunct>
5468 userx 20 0 3911460 2.5g 9148 S 0.0 4.0 17:48.90 python3 do_Db_job
5469 userx 20 0 3904568 2.5g 9148 S 0.0 4.0 16:13.19 python3 do_Db_job
5470 userx 20 0 3905408 2.5g 9148 S 0.0 4.0 16:34.32 python3 do_Db_job
5471 userx 20 0 3865764 2.4g 9148 S 0.0 3.9 18:35.39 python3 do_Db_job
5472 userx 20 0 3872140 2.5g 9148 S 0.0 3.9 20:43.44 python3 do_Db_job
5478 userx 20 0 3844492 2.4g 4252 S 0.0 3.9 0:00.00 python3 do_Db_job
27052 userx 20 0 9412 3784 3052 S 0.0 0.0 0:00.02 sshd: userx@pts/2
27054 userx 20 0 7932 4520 3224 S 0.0 0.0 0:00.01 -bash
在我看来,在任何给定的时间,最多会运行6个子进程,这可能与psutil.cpu_count(logical=False)=6对应,而不是pathos.multiprocessing.cpu_count()=12
事实上,这个问题已经解决了——事实证明,在我的代码开发阶段,这个问题从来都不是第一位的。问题出在其他地方:工作进程提供的变量非常大,有时会有很多千兆字节。这种情况下,即使在新的双xeon机器上,使用变量dill/undill(如pickle/unpickle),主/中心节点也将永远处于繁忙状态,更不用说旧的I3CPU盒了。对于前者,我看到多达6或7个工人在工作(11个工人中有6或7个),而后者甚至没有超过1个工人在工作,即使是对于前者,也需要大量的时间,几十分钟,才能看到几个工人聚集在顶部
因此,我需要调整代码,以便每个工作人员都必须从磁盘/网络中重新读取巨大的变量-这也需要一些时间,但将中心节点从这个愚蠢的重复任务中解放出来是有意义的,而是让它有机会完成其设计的任务,即安排和组织工作人员的演出
我还高兴地说,与传统的串行版本相比,并行运行的结果是一个CSV文件(wc:36722 1133356 90870757)
说到这里,我真的很惊讶使用python/pathos是多么方便——而且不必在串行和并行运行之间更改相关的工作代码
相关问题 更多 >
编程相关推荐