CFD python代码的并行化、多线程和多处理、CPU受限、笔记本电脑崩溃

2024-10-05 17:40:03 发布

您现在位置:Python中文网/ 问答频道 /正文

原员额如下:

我正在尝试运行一个按顺序运行的CFD(计算流体动力学)代码,但我想通过并行化来加快速度。(因此,我们感兴趣的是,我们正在计算三维的热传递)。 更准确地说,我必须并行放置一个3和4的嵌套循环。据我所知,我的代码是CPU限制的,所以我必须使用多处理而不是线程,到目前为止我是对的吗? 我已经尝试了几种方法来实现这一点,甚至线程只是以防万一。 实际上,我无法完成计算,每次我尝试时,我的笔记本电脑都冻结了,但是代码被编译并执行了。。。我有一个东芝卫星,有英特尔®核心™ Ubuntu20.04 LTS上2.50GHz×4、1.0TB和3.7Gib的i7-6500U CPU

第一次测试:

import multiprocessing as mp 

def iterative(i,j,k):  
     ind = j*(N+2)+i+k*((N+1)*(N+2)+N+2)
     indW = ind-1
     indE = ind+1
     indN = ind + (N+2) 
     indS = ind - (N+2)
     indT = ind + ((N+1)*(N+2)+N+2)
     indB = ind - ((N+1)*(N+2)+N+2)

     if j == 0 and k != 0 and k != N+1:
            T[ind] = T_F
     elif i == N+1 and j != N+1 and k != N+1 and k != 0:
            T[ind] = T_R
     elif i == 0 and j != 0 and k != 0 and k != N+1:
            T[ind] = T_L
     elif i != 0 and j == N+1 and k != 0 and k != N+1:
            T[ind] = T_B
     elif k == N+1:
            T[ind] = T_U
     elif k == 0:
            T[ind] = T_D
     elif i == N and j == N and k == N:
            T[ind] = T[ind] * (1-9*c) + T_R * (2*c) + T[indW]*c + T_B *(2*c) + c*T[indS] + 2*c*T_U + c*T[indB] + s*dt
     elif i == 1 and j == N and k == N: 
            T[ind] = T[ind] * (1-9*c) + T_L * (2*c) + c*T[indE] + 2*c*T_B + c*T[indS] + 2*c*T_U + c*T[indB] + s*dt
     elif i == N and j == N and k == 1:
            T[ind] = T[ind] * (1-9*c) + 2*c*T_R + c*T[indW] + 2*c*T_B  + c*T[indS] + 2*c*T_D + c*T[indT] + s*dt
     elif i == 1 and j == N and k == 1:
            T[ind] = T[ind] * (1-9*c) + 2*c*T_L + c*T[indE] + 2*c*T_B + c*T[indS] + 2*c*T_D + c*T[indT] + s*dt
     elif i != 0 and i != N+1 and j == 1 and k == N:
            T[ind] = T[ind] * (1-8*c) + c*T[indE] + c*T[indW] + c*T[indN] + 2*c*T_F + 2*c*T_U + c*T[indB] + s*dt
   
          ...
          ...

processes = []

for n in range(0,8):
    for i in range(0,N+2):
        for j in range(0,N+2):
            for k in range(0,N+2):
                p = mp.Process(target=iterative,(i,j,k))
                p.start()
                Processes.append(p)

for p in processes:
    p.join()

              ....
              ....

为什么它会冻结/崩溃

我还尝试创建这样的进程池:


pool = multiprocessing.Pool(multiprocessing.cpu_count())

for n in range(0,N+2):
    for i in range(0,N+2):
        for j in range(0,N+2):
            for k in range(0,N+2):
                pool.apply_async(initial,(i,j,k))
    ......
    ......

同样的事情,它崩溃了

对于线程,它不会崩溃,但计算大约需要600秒,而顺序计算大约需要一分钟。。。我认为这是因为我的代码受CPU限制(而不是I/O限制),设置所有线程需要时间。。。我说得对吗? 所以,我也用进程而不是线程做了这件事,但它再次崩溃了。。。 嗯,我不知道如何使它工作,而不是撞坏我的笔记本电脑,这不是一个垃圾的先验知识。 有人能帮我做这个吗?谢谢你抽出时间

编辑如下:

正如@Jerôme Richard在评论中向我建议的那样,我通过增加并行执行的一个任务中的操作数(增加粒度),在每个单元中创建更少的任务。这大大提高了计算速度。多线程计算的结果是好的,比顺序计算的结果快一点,但这种方法不是真正的并行化方法。所以我想使用多核的多处理来获得最大的计算时间。为了减少进程之间的通信时间,我考虑按顺序计算“初始”函数,因为这项工作非常快,而且只有迭代循环是并行的。因此,有了所有这些变化,我的笔记本电脑不再崩溃,计算速度更快,但在多处理的情况下,并行迭代循环似乎不起作用,因为结果仍然是初始化的结果。我尝试使用和不使用进程池,但它返回相同的结果。我不知道问题出在哪里,也许有人能看到

下面是我的代码中的一些更改,首先是进程池:


import multiprocessing as mp
       ...
       ...
def iterative(i):   
    for j in range(0,N+2):
        for k in range(0,N+2):   
            ind = j*(N+2)+i+k*((N+1)*(N+2)+N+2)
            indW = ind-1
            indE = ind+1
            indN = ind + (N+2) 
            indS = ind - (N+2)
            indT = ind + ((N+1)*(N+2)+N+2)
            indB = ind - ((N+1)*(N+2)+N+2)

            if j == 0 and k != 0 and k != N+1:
                T[ind] = T_F
            elif i == N+1 and j != N+1 and k != N+1 and k != 0:
                T[ind] = T_R
            elif i == 0 and j != 0 and k != 0 and k != N+1:
                T[ind] = T_L
            elif i != 0 and j == N+1 and k != 0 and k != N+1:
                T[ind] = T_B
            elif k == N+1:
                T[ind] = T_U
            elif k == 0:
                T[ind] = T_D
            elif i == N and j == N and k == N:
                T[ind] = T[ind] * (1-9*c) + T_R * (2*c) + T[indW]*c + T_B *(2*c) + c*T[indS] + 2*c*T_U + c*T[indB] + s*dt
            ...
            ...

pool2 = mp.Pool(mp.cpu_count())

for n in range(0,8): # time iterative loop
    print(n)

    for i in range(0,N+2):
         pool2.apply_async(iterative,[i])
    if n == 7:
        with open("chaleur_para.txt","w") as file:
            for i in range(0,(N+2)**3):
                file.write(str(T[i]) + "\n")
            file.close()
    im = plt.contourf(X,Z,M.T,100,cmap='plasma')
    text = ['t = ' + str(n*dt) + ' s']
    plt.legend(text,loc='upper left')
    ims.append(im.collections)
    if n == 7:
        print("No more iteration.")


pool2.close()
pool2.join()




ani = animation.ArtistAnimation(fig, ims, interval = 1000, blit=True)
print(f"Exectuion time is {time.time()-t1} seconds.")
plt.xlabel('x')
plt.ylabel('z')
plt.xlim(-0.05,1.05)
plt.ylim(-0.05,1.05)
plt.colorbar(label='Température en °K')
ani.save("Evolution_T_2D.mp4")
plt.show()

没有游泳池:

import multiprocessing as mp

         ...
         ...
Processes = []

for n in range(0,8): # time iterative loop
    print(n)
    for i in range(0,N+2):
        p = mp.Process(target=iterative,args=[i])
        p.start()
        Processes.append(p)

    for p in Processes:
        p.join()
    if n == 7:
        with open("chaleur_para.txt","w") as file:
            for i in range(0,(N+2)**3):
                file.write(str(T[i]) + "\n")
            file.close()
    im = plt.contourf(X,Z,M.T,100,cmap='plasma')
    text = ['t = ' + str(n*dt) + ' s']
    plt.legend(text,loc='upper left')
    ims.append(im.collections)
    if n == 7:
        print("No more iteration.")

       ....
       ....

有人有主意吗?提前谢谢


Tags: andinforifdtrangepltmp