并行共享内存与MPI锁定
pshmem的Python项目详细描述
具有共享内存的MPI设计模式
这是一个使用MPI单面和实现并行设计模式的小软件包 共享内存构造。在
安装及要求
此包需要mpi4py
包的最新版本才能有用。
但是,类也接受通信器的None
值,在这种情况下
使用琐碎的本地实现。该代码使用其他广泛可用的包
(像numpy)并且需要最近安装Python3。您可以从
git签出:
pip install .
或者:
^{pr2}$或者直接从github。在
MPIShared类
这个类实现了一个模式,其中在每个节点上分配一个共享数组。 进程可以使用同步的“set()”方法更新共享数组的片段。 在这个调用过程中,来自所需进程的数据首先被复制到所有节点, 然后每个节点上的一个进程将该片段复制到共享数组中。在
所有节点上的所有进程都可以从共享的节点本地副本自由读取数据 数组。在
示例
您可以使用MPIShared
作为上下文管理器,或者显式地创建和释放
记忆。下面是一个创建共享内存对象的示例,该对象跨
节点:
importnumpyasnpfrommpi4pyimportMPIfrompshmemimportMPISharedcomm=MPI.COMM_WORLDwithMPIShared((3,5),np.float64,comm)asshm:# A copy of the data exists on every node and is initialized to zero.# There is a numpy array "view" of that memory available with slice notation# or by accessing the "data" member:ifcomm.rank==0:# You can get a summary of the data by printing it:print("String representation:\n")print(shm)print("\n===== Initialized Data =====")forpinrange(comm.size):ifp==comm.rank:print("rank {}:\n".format(p),shm.data,flush=True)comm.barrier()set_data=Noneset_offset=Noneifcomm.rank==0:set_data=np.arange(6,dtype=np.float64).reshape((2,3))set_offset=(1,1)# The set() method is collective, but the inputs only matter on one rankshm.set(set_data,offset=set_offset,fromrank=0)# You can also use the usual '[]' notation. However, this call must do an# additional pre-communication to detect which process the data is coming from.# And this line is still collective and must be called on all processes:shm[set_offset]=set_data# This updated data has now been replicated to the shared memory on all nodes.ifcomm.rank==0:print("======= Updated Data =======")forpinrange(comm.size):ifp==comm.rank:print("rank {}:\n".format(p),shm.data,flush=True)comm.barrier()# You can read from the node-local copy of the data from all processes,# using either the "data" member or slice access:ifcomm.rank==comm.size-1:print("==== Read-only access ======")print("rank {}: shm[2, 3] = {}".format(comm.rank,shm[2,3]),flush=True)print("rank {}: shm.data = \n{}".format(comm.rank,shm.data),flush=True)
将上述代码放入一个文件test.py
,并在4个进程上运行此代码,可以得到:
mpirun -np 4 python3 test.py
String representation:
<MPIShared
replicated on 1 nodes, each with 4 processes (4 total)
shape = (3, 5), dtype = float64
[ [0. 0. 0. 0. 0.] [0. 0. 0. 0. 0.] [0. 0. 0. 0. 0.] ]
>
===== Initialized Data =====
rank 0:
[[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]]
rank 1:
[[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]]
rank 2:
[[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]]
rank 3:
[[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0.]]
======= Updated Data =======
rank 0:
[[0. 0. 0. 0. 0.]
[0. 0. 1. 2. 0.]
[0. 3. 4. 5. 0.]]
rank 1:
[[0. 0. 0. 0. 0.]
[0. 0. 1. 2. 0.]
[0. 3. 4. 5. 0.]]
rank 2:
[[0. 0. 0. 0. 0.]
[0. 0. 1. 2. 0.]
[0. 3. 4. 5. 0.]]
rank 3:
[[0. 0. 0. 0. 0.]
[0. 0. 1. 2. 0.]
[0. 3. 4. 5. 0.]]
==== Read-only access ======
rank 3: shm[2, 3] = 5.0
rank 3: shm.data =
[[0. 0. 0. 0. 0.]
[0. 0. 1. 2. 0.]
[0. 3. 4. 5. 0.]]
请注意,如果不使用上下文管理器,则应小心关闭 然后像这样删除对象:
shm=MPIShared((3,5),np.float64,comm=comm)# Do stuffshm.close()delshm
MPILock类
这在任意通信器上实现互斥锁。内存缓冲区 单个进程充当等待列表,进程可以在其中添加自己(使用 单方通话)。进程传递一个令牌来转移锁的所有权。这个 令牌按请求的顺序传递。在
示例
一个典型的用例是我们想要在大量的 驻留在不同节点上的进程。例如,也许我们正在提出请求 从计算中心到外部网络,我们不想让它饱和 所有过程同时进行。或者我们正在写入一个共享数据文件 不支持并行写入,我们有一个编写过程的子通讯器 轮流更新文件系统。我们可以实例化任何 沟通者,所以有可能把世界上的沟通者分成小组 某些操作仅在该组内序列化:
withMPILock(MPI.COMM_WORLD)asmpilock:mpilock.lock()# Do something here. Only one process at a time will do this.mpilock.unlock()
测试
安装后,可以使用以下工具运行一些测试:
mpirun -np 4 python3 'import pshmem.test; pshmem.test.run()'
如果您有mpi4py可用,但希望在中显式禁用MPI 测试时,可以设置环境变量:
MPI_DISABLE=1 python3 -c 'import pshmem.test; pshmem.test.run()'
- 项目
标签: