使用多重处理查找阵列中点之间的距离

2024-09-30 14:24:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在尝试使用python的多处理库来加速一个实践项目。在这个项目中,我有两个数组,点和权重,都是x,y坐标。我试图找到每个重量坐标和点之间的距离。当我使用多处理运行程序时,程序使用所有计算机的ram和CPU,当查看task manager时,最多有20个python实例正在运行。我知道这个程序可以运行,因为它不需要多重处理,但需要大约20秒才能完成

下面是代码,底部是使用Pool.map和多处理库中的Process运行的编程

import math
import random
import multiprocessing as mp

screenSize = 1000000

pointsLength = 2000
weightLength = 20000

weightBuffer = screenSize/weightLength
pointBuffer = screenSize/pointsLength

points = []
weights = []
weightPoints = []

counter = 0

for i in range(pointsLength):
    for j in range(pointsLength):
        points.append([random.randint(j * pointBuffer, j * pointBuffer * 2), 
        random.randint(i * pointBuffer, i * pointBuffer * 2)])


for i in range(pointsLength):
    for j in range(pointsLength):
        weightPoints.append([j * weightBuffer, i * weightBuffer])
        weights.append(0)


def FindDistance(i):
    row = math.floor((i / weightLength) / (weightLength / pointsLength))
    col = math.floor((i % weightLength) / (weightLength / pointsLength))
    points1d = (pointsLength * row) + col

    dist = math.dist(points[points1d], weightPoints[i])

    weights[i] = dist

# With Multiprocessing Pool
# sumthing = []
# for i in range(len(weights)):
#     sumthing.append(i)

# with mp.Pool(4) as p:
#     p.map(FindDistance, sumthing)


# With Multiproessing Process
processes = []
for i in range(len(weights)):
    p = mp.Process(target=FindDistance, args=[i])
    p.start()
    processes.append(p)

for process in processes:
    process.join()


# Without Multiprocessing
# for i in range(len(weights)):
#     FindDistance(i)

#     counter += 1

#     if (counter % 25000 == 0):
#         print(counter / 25000)

如果有人知道如何让多处理器工作,程序将使用我的计算机上的8个内核,而不会因为ram或cpu限制而使程序崩溃


Tags: inimport程序forcounterrangemathprocess
3条回答

您正在迭代权重的长度(根据您的代码为2000),并为每次迭代生成一个新进程,这意味着2000个进程。 难怪CPU和RAM都满了

您需要做的是将权重数组分块成8个较小的数组,最好长度相等。更改FindInstance函数以将数组作为参数。此参数将是较小的分块数组

def FindDistance(i_arr):
    for i in i_arr:
        row = math.floor((i / weightLength) / (weightLength / pointsLength))
        col = math.floor((i % weightLength) / (weightLength / pointsLength))
        points1d = (pointsLength * row) + col

        dist = math.dist(points[points1d], weightPoints[i])

        weights[i] = dist

def chunking(weights):
    # initialise array of empty arrays with Length equal to number of processors
    smaller_chunks = [ [] i for i in range(8) ]
    for index,item in enumerate(weights):
        index_to_push = index % 8
        smaller_chunks[index_to_push].append(item)

    return smaller_chunks
processes = []
chunks = chunking(weights)
for i in range(len(chunks)):
    p = mp.Process(target=FindDistance, args=[i])
    p.start()
    processes.append(p)

for process in processes:
    process.join()

        

问题是你没有正确地进行多重处理。具体来说,您的代码缺少if __name__ == '__main__':保护。下面的代码使用了multiprocessing.Pool(我认为这是做你想做的事情的最好和最简单的方法),这是固定的。它仍然需要几秒钟的时间来执行,但它不会占用内存和CPU

有关需要if __name__ == '__main__':的信息隐藏在multiprocessing模块文档的The ^{} and ^{} start methods部分的^{主模块的安全导入小节中

import math
import random
import multiprocessing as mp

screenSize = 1000000

pointsLength = 2000
weightLength = 20000

weightBuffer = screenSize/weightLength
pointBuffer = screenSize/pointsLength

points = []
weights = []
weightPoints = []

counter = 0

for i in range(pointsLength):
    for j in range(pointsLength):
        points.append([random.randint(j * pointBuffer, j * pointBuffer * 2),
        random.randint(i * pointBuffer, i * pointBuffer * 2)])


for i in range(pointsLength):
    for j in range(pointsLength):
        weightPoints.append([j * weightBuffer, i * weightBuffer])
        weights.append(0)


def FindDistance(i):
    row = math.floor((i / weightLength) / (weightLength / pointsLength))
    col = math.floor((i % weightLength) / (weightLength / pointsLength))
    points1d = (pointsLength * row) + col

    dist = math.dist(points[points1d], weightPoints[i])

    weights[i] = dist


if __name__ == '__main__':  # ADDED

    # With Multiprocessing Pool
    sumthing = []
    for i in range(len(weights)):
        sumthing.append(i)

    with mp.Pool(4) as p:
        p.map(FindDistance, sumthing)

问题在于您正在weights的长度上循环。从你的代码中选择哪个

for i in range(pointsLength): #pointsLength is 2000
    for j in range(pointsLength):
        weightPoints.append([j * weightBuffer, i * weightBuffer])
        weights.append(0)

是2000*2000=40000吗

因此,您试图同时创建40000个新进程,导致系统崩溃。 相反,您可以将weights列表分解为n个较小的数组,并使用这些数组创建n个新进程

我们可以使用numpy函数numpy.array_split分割数组。 现在更新FindDistance函数以接受整个新数组作为输入

def FindDistance(subarr):
    for i in subarr:
        row = math.floor((i / weightLength) / (weightLength / pointsLength))
        col = math.floor((i % weightLength) / (weightLength / pointsLength))
        points1d = (pointsLength * row) + col

        dist = math.dist(points[points1d], weightPoints[i])

        weights[i] = dist

最后用新参数创建n进程

subarrays = np.array_split(weights, n)
processes = []
for subarr in subarrays:
    p = mp.Process(target=FindDistance, args=[subarr])
    p.start()
    processes.append(p)

for process in processes:
    process.join()

相关问题 更多 >