优化运行速度比python版本慢的fortran代码

2024-09-27 23:23:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个Fortran代码,我正在用f2py编译以在python中运行。我写这段代码是为了更快地处理已经存在的python代码,但实际上它的运行速度比它的python代码慢,这让我认为它没有得到优化。(这可能与this question有关,尽管该案例中的罪魁祸首不是我在这里使用的,所以它不适用于我。)

代码得到一个4D矩阵,当输入使用维度3和4执行二维关联时(就像它们是x和y一样)。在

代码是这样的:

SUBROUTINE correlate4D(Vars, nxc, nyc, nv, nt, vCorr)
! Correlates 4D array assuming that x, y are dims 3 and 4
! Returns a 4D array with shape nv, nt, 2*nxc+1, 2*nyc+1
IMPLICIT NONE
real(kind=8), intent(in), dimension(:,:,:,:) :: Vars
real(kind=8) :: dummysize
integer, intent(in) :: nxc, nyc
integer :: ii, i, iix, iiy, iv, it, dims(4), nv, nt, nx, ny
integer, dimension(2*nxc+1) :: xdel
integer, dimension(2*nyc+1) :: ydel
real(kind=8), intent(out) :: vCorr(nv, nt, 2*nxc+1, 2*nyc+1)
real(kind=8), dimension(:,:,:,:), allocatable :: rolled, prerolled
real(kind=8), dimension(:,:), allocatable :: Mean
real(kind=8), dimension(:,:,:), allocatable :: Mean3d

dims = shape(Vars)
nx=dims(3)
ny=dims(4)
dummysize=nx*ny
allocate(rolled(nv, nt, nx, ny))
allocate(prerolled(nv, nt, nx, ny))
allocate(Mean3d(nv, nt, nx))
allocate(Mean(nv, nt))

Mean3d = sum(Vars, dim=4)/size(Vars, dim=4)
Mean = sum(Mean3d, dim=3)/size(Mean3d, dim=3)

! These replace np.arange()
ii=1
do i=-nxc,+nxc
    xdel(ii)=i
    ii=ii+1
enddo
ii=1
do i=-nyc,+nyc
    ydel(ii)=i
    ii=ii+1
enddo

! Calculate the correlation
do iiy=1,size(ydel)
    print*,'fortran loop:',iiy,' of', size(ydel)
    ! cshift replaces np.roll()
    prerolled = cshift(Vars, ydel(iiy), dim=4)
    do iix=1,size(xdel)
        rolled = cshift(prerolled, xdel(iix), dim=3)
        forall (it=1:nt)
            forall (iv=1:nv)
                vCorr(iv,it,iix,iiy) = (sum(Vars(iv,it,:,:) * rolled(iv,it,:,:))/dummysize) / (Mean(iv,it)**2)
            endforall
        endforall
    enddo
enddo

END SUBROUTINE

使用(3, 50, 100, 100)大小的矩阵来运行它,使用f2py编译的代码需要251秒,而使用纯python/numpy代码只需要103秒。顺便说一下,这是作为输入的矩阵的平均大小,应该是(3, 300, 100, 100),但不会比这个大多少。在

有谁能给我指出优化代码的方法吗?在

编辑

我用f2py3.4 -c mwe.f90 -m mwe编译,然后可以用

^{pr2}$

编辑2

在阅读了评论之后,我可以通过改变索引的顺序来改进它。现在它比Python快10%,但仍然太慢了。我相信这可以做得更快。在

SUBROUTINE correlate4D2(Vars, nxc, nyc, nt, nv, vCorr)
! Correlates 4D array assuming that x, y are dims 1 and 2
! Returns a 4D array with shape 2*nxc+1, 2*nyc+1, nt, nv
IMPLICIT NONE
INTEGER, PARAMETER  ::  dp = SELECTED_REAL_KIND (13)
real(kind=8), intent(in), dimension(:,:,:,:) :: Vars
real(kind=8) :: dummysize
integer, intent(in) :: nxc, nyc
integer :: ii, i, iix, iiy, iv, it, dims(4), nv, nt, nx, ny
integer, dimension(2*nxc+1) :: xdel
integer, dimension(2*nyc+1) :: ydel
!real(kind=8), intent(out) :: vCorr(nv, nt, 2*nxc+1, 2*nyc+1)
real(kind=8), intent(out) :: vCorr(2*nxc+1, 2*nyc+1, nt, nv)
real(kind=8), dimension(:,:,:,:), allocatable :: rolled, prerolled
real(kind=8), dimension(:,:), allocatable :: Mean
real(kind=8), dimension(:,:,:), allocatable :: Mean3d

dims = shape(Vars)
nx=dims(1)
ny=dims(1)
dummysize=nx*ny
allocate(rolled(nx, ny, nt, nv))
allocate(prerolled(nx, ny, nt, nv))
allocate(Mean3d(ny, nt, nv))
allocate(Mean(nt, nv))

Mean3d = sum(Vars, dim=1)/size(Vars, dim=1)
Mean = sum(Mean3d, dim=1)/size(Mean3d, dim=1)

ii=1
do i=-nxc,+nxc
    xdel(ii)=i
    ii=ii+1
enddo
ii=1
do i=-nyc,+nyc
    ydel(ii)=i
    ii=ii+1
enddo

do iiy=1,size(ydel)
    print*,'fortran loop:',iiy,' of', size(ydel)
    prerolled = cshift(Vars, ydel(iiy), dim=2)
    do iix=1,size(xdel)
        rolled = cshift(prerolled, xdel(iix), dim=1)
        forall (iv=1:nv)
            forall (it=1:nt)
                vCorr(iix,iiy,it,iv) = (sum(Vars(:,:,it,iv) * rolled(:,:,it,iv))/dummysize) / (Mean(it,iv)**2)
            endforall
        endforall
    enddo
enddo

END SUBROUTINE

另外,即使现在代码中有一个dp参数(它应该返回8),如果我用real(dp)f2py声明变量,也会抛出这个错误:Parameter 'dp' at (1) has not been declared or is a variable,即使它被声明了。所以这就是我直接使用8的原因。在


Tags: 代码itvarsrealiinxdimnyc
1条回答
网友
1楼 · 发布于 2024-09-27 23:23:34

注意:下面是一个冗长而乏味的答案。。。在

因为对大型矩阵重复使用cshift()似乎很昂贵,所以我尝试了对cshift进行一些修改。为此,我首先创建了OP代码的最小版本:

program main
    implicit none
    integer, parameter :: N = 100, nt = 50, dp = kind(0.0d0)
    real(dp), allocatable, dimension(:,:,:) :: A, Ashift_y, Ashift, B
    integer :: sx, sy, i, t

    allocate( A( N, N, nt ), Ashift_y( N, N, nt ), Ashift( N, N, nt ), &
              B( -N:N-1, -N:N-1, nt ) )
    call initA

    do sy = -N, N-1
        if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy

        Ashift_y = cshift( A, sy, dim=2 )

        do sx = -N, N-1
            Ashift = cshift( Ashift_y, sx, dim=1 )

            do t = 1, nt
                B( sx, sy, t )= sum( A( :, :, t ) * Ashift( :, :, t ) )
            enddo
        enddo
    enddo

    print *, "sum(B) = ", sum(B)
    print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )

contains
    subroutine initA
        integer ix, iy
        forall( t = 1:nt, iy = 1:N, ix = 1:N )  &   ! (forall not recommended but for brevity)
                A( ix, iy, t ) = 1.0_dp / ( mod(ix + iy + t, 100) + 1 )
    endsubroutine    
endprogram

这给了

^{pr2}$

接下来,由于cshift(A,s,dim=1 (or 2))相对于移位s(具有周期性N)是周期性的,sx和{}上的循环可以分成四个部分,并且只保留第一个象限(即在[0,N-1])。其他象限的数据只需复制第一象限的数据即可获得。这将使CPU时间减少4。(更简单地说,我们只能在[-N/2,N/2]中计算sx和{},因为其他区域的B没有提供新的信息。)

    do sy = 0, N-1
        if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
        Ashift_y = cshift( A, sy, dim=2 )

        do sx = 0, N-1
            Ashift = cshift( Ashift_y, sx, dim=1 )

            do t = 1, nt
                B( sx, sy, t )= sum( A( :, :, t ) * Ashift( :, :, t ) )
            enddo
        enddo
    enddo

    print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )

    !! Make "full" B.
    B( -N :  -1,  0 : N-1, : ) = B( 0 : N-1, 0 : N-1, : )
    B(  0 : N-1, -N :  -1, : ) = B( 0 : N-1, 0 : N-1, : )
    B( -N :  -1, -N :  -1, : ) = B( 0 : N-1, 0 : N-1, : )
    print *, "sum(B) = ", sum(B)

结果与全部计算结果一致,如预期:

sum(B) =    53817771.021093562     
sum( B( 0:N-1, 0:N-1, : ) ) =    13454442.755258575     

Mac   : 12.8 sec
Linux :  8.3 sec

相应的Python代码可能如下所示:

from __future__ import print_function, division
import numpy as np

N, nt = 100, 50

A = np.zeros( (nt, N, N) )
B = np.zeros( (nt, N, N) )

for t in range(nt):
    for iy in range(N):
        for ix in range(N):
            A[ t, iy, ix ] = 1.0 / ( (ix + iy + t) % 100 + 1 )

for sy in range( N ):
    if sy % (N // 10) == 0 : print( "sy = ", sy )
    Ashift_y = np.roll( A, -sy, axis=1 )

    for sx in range( N ):
        Ashift = np.roll( Ashift_y, -sx, axis=2 )

        for t in range( nt ):
            B[ t, sy, sx ] = np.sum( A[ t, :, : ] * Ashift[ t, :, : ] )

print( "sum( B( :, 0:N-1, 0:N-1 ) ) = ",  np.sum( B ) )

它在Mac和Linux(python3.5)上运行22 24秒。在

为了进一步降低成本,我们利用cshift可以用两种等效的方式来使用:

cshift( array, s ) == array( cshift( [1,2,...,n], s ) )   !! assuming that "array" is declared as a( n )

然后我们可以重写上面的代码,使cshift()只接收ind = [1,2,...,N]

    integer, dimension(N) :: ind, indx, indy
    ind = [( i, i=1,N )]

    do sy = 0, N-1
        if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
        indy = cshift( ind, sy )

        do sx = 0, N-1
            indx = cshift( ind, sx )

            do t = 1, nt
                B( sx, sy, t )= sum( A( :, :, t ) * A( indx, indy, t ) )
            enddo
        enddo
    enddo

在Mac和Linux上都能在5秒内运行。类似的方法也适用于Python。(我还尝试显式地使用mod()作为索引来完全消除cshift,但有点令人惊讶的是,它比上面的代码慢了两倍多…)

即使有了这种减少,对于大的nt(比如问题中所示的300),代码也会变慢。在这种情况下,我们可以求助于最后一种武器,使sy上的循环并行化:

program main
    implicit none
    integer, parameter :: N = 100, nt = 50, dp = kind(0.0d0)
!    integer, parameter :: N = 100, nt = 300, dp = kind(0.0d0)
    real(dp), allocatable, dimension(:,:,:) :: A, B
    integer, dimension(N) :: ind, indx, indy
    integer :: sx, sy, i, t

    allocate( A( N, N, nt ), B( -N:N-1, -N:N-1, nt ) )
    call initA
    ind = [( i, i=1,N )]

    !$omp parallel do private(sx,sy,t,indx,indy)
    do sy = 0, N-1
        if ( mod( sy, N/10 ) == 0 ) print *, "sy = ", sy
        indy = cshift( ind, sy )

        do sx = 0, N-1
            indx = cshift( ind, sx )

            do t = 1, nt
                B( sx, sy, t )= sum( A( :, :, t ) * A( indx, indy, t ) )
            enddo
        enddo
    enddo
    !$omp end parallel do

    print *, "sum( B( 0:N-1, 0:N-1, : ) ) = ", sum( B( 0:N-1, 0:N-1, : ) )

    ! "contains subroutine initA ..." here

endprogram

定时数据如下(使用gfortran-O3-fopenmp):

N = 100, nt = 50
sum( B( 0:N-1, 0:N-1, : ) ) =    13454442.755258575     
Mac:
   serial : 5.3 sec
2 threads : 2.6 sec
4 threads : 1.4 sec

N = 100, nt = 50
sum( B( 0:N-1, 0:N-1, : ) ) =    13454442.755258575     
Linux:
    serial : 4.8 sec
 2 threads : 2.7 sec
 4 threads : 1.3 sec
 8 threads : 0.7 sec
16 threads : 0.4 sec
32 threads : 0.4 sec

N = 100, nt = 300   // heavy case
sum( B( 0:N-1, 0:N-1, : ) ) =    80726656.531429410     
Linux:
 2 threads: 16.5 sec
 4 threads:  8.4 sec
 8 threads:  4.4 sec
16 threads:  2.5 sec

所以,如果上面的代码没有bug(希望如此!),我们可以通过(1)将sx和{}限制为[0,N-1](或者更简单地说是[-N/2,N/2]而不需要进一步复制),(2)将cshift应用于索引数组(而不是数据数组),和/或(3)在sy上并行化(希望可以与f2py结合使用…)

相关问题 更多 >

    热门问题