两个一边有填充的三维数组的卷积

2024-09-27 20:20:11 发布

您现在位置:Python中文网/ 问答频道 /正文

在我当前的项目中,我需要以一种稍微不寻常的方式“卷积”两个三维数组:

假设我们有两个三维数组A和B,其维数为dimA和dimB(每个轴都相同)。现在我们要创建第三个数组C,每个轴的尺寸为dimA+dimB。在

C项计算如下:

c_{x1+x2,y1+y2,z1+z2} += a_{x1,y1,z1} * b_{x2,y2,z2}

我现在的版本很简单:

^{pr2}$

不幸的是,这个版本很慢,不可用。在

我的第二个版本是:

C = scipy.signal.fftconvolve(A,B,mode="full")

但这只计算元素max(dimA,dimB)

有没有更好的主意?在


Tags: 项目版本尺寸方式数组卷积x1x2
3条回答

上面的Numba方法是一个很好的技巧,但只对相对较小的N有利。无论实现的速度有多快,一个O(N^6)算法每次都会杀死你。在我的测试中,fftconvolve方法在N=20左右交叉更快,而在N=32时是10倍。省略上面custom_convolution的定义:

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

if __name__ == '__main__':
    reps = 3
    nt, ft = [], []
    x = range(2, 34)
    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)
        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        assert np.allclose(C1[:-1, :-1, :-1], C2)
        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
    plt.plot(x, ft, x, nt)
    plt.show()

它也非常依赖于N,因为对于2的幂次,FFT要快得多。对于N=17到N=32,FFT版本的时间基本上是恒定的,在N=33时,它仍然更快,此时它又开始快速发散。在

您可以尝试用Numba包装FFT实现,但不能直接用scipy版本实现。在

(很抱歉创建新答案,但我没有直接答复的要点。)

一般规则是对作业使用正确的算法,除非与数据相比,卷积内核较短,否则该算法是基于FFT的卷积(short大致意味着小于log2(n),其中n是数据的长度)。在

在您的例子中,由于您正在卷积两个大小相等的数据集,您可能需要考虑基于FFT的卷积。在

显然,scipy.signal.fftconvolve在这方面是一个不足之处。使用更快的FFT算法,您可以通过滚动自己的卷积例程来做得更好(fftconvolve将变换大小强制为2的幂次,否则可能会被monkey修补)。在

下面的代码使用pyfftw,我在FFTW周围进行包装,并创建一个自定义卷积类CustomFFTConvolution

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

它用作:

^{pr2}$

在构造类时使用可选的threads参数。FFF的目的是为了在课堂上创造一个有益于TW的能力。在

下面的完整演示代码只是扩展了@Kelsey's answer的计时等。在

与numba解决方案和香草fftconvolve解决方案相比,加速效果显著。对于n=33,它比两者都快40-45倍。在

from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
import pyfftw

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

def fft_convolution(A, B):
    return fftconvolve(A, B, mode='full')

class CustomFFTConvolution(object):

    def __init__(self, A, B, threads=1):

        shape = (np.array(A.shape) + np.array(B.shape))-1

        if np.iscomplexobj(A) and np.iscomplexobj(B):
            self.fft_A_obj = pyfftw.builders.fftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.fftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.ifftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

        else:
            self.fft_A_obj = pyfftw.builders.rfftn(
                    A, s=shape, threads=threads)
            self.fft_B_obj = pyfftw.builders.rfftn(
                    B, s=shape, threads=threads)
            self.ifft_obj = pyfftw.builders.irfftn(
                    self.fft_A_obj.get_output_array(), s=shape,
                    threads=threads)

    def __call__(self, A, B):

        fft_padded_A = self.fft_A_obj(A)
        fft_padded_B = self.fft_B_obj(B)

        return self.ifft_obj(fft_padded_A * fft_padded_B)

def run_test():
    reps = 10
    nt, ft, cft, cft2 = [], [], [], []
    x = range(2, 34)

    for N in x:
        print N
        A = np.random.rand(N, N, N)
        B = np.random.rand(N, N, N)

        custom_fft_conv = CustomFFTConvolution(A, B)
        custom_fft_conv_nthreads = CustomFFTConvolution(A, B, threads=2)

        C1 = numba_convolution(A, B)
        C2 = fft_convolution(A, B)
        C3 = custom_fft_conv(A, B)
        C4 = custom_fft_conv_nthreads(A, B)

        assert np.allclose(C1[:-1, :-1, :-1], C2)
        assert np.allclose(C1[:-1, :-1, :-1], C3)
        assert np.allclose(C1[:-1, :-1, :-1], C4)

        t = Timer(lambda: numba_convolution(A, B))
        nt.append(t.timeit(number=reps))
        t = Timer(lambda: fft_convolution(A, B))
        ft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv(A, B))
        cft.append(t.timeit(number=reps))
        t = Timer(lambda: custom_fft_conv_nthreads(A, B))
        cft2.append(t.timeit(number=reps))

    plt.plot(x, ft, label='scipy.signal.fftconvolve')
    plt.plot(x, nt, label='custom numba convolve')
    plt.plot(x, cft, label='custom pyfftw convolve')
    plt.plot(x, cft2, label='custom pyfftw convolve with threading')        
    plt.legend()
    plt.show()

if __name__ == '__main__':
    run_test()

EDIT:More recent scipy does a better job of not always padding to powers of 2 length所以在输出上更接近pyFFTW的情况。在

你试过使用Numba吗?它是一个包,它允许您用JIT编译器包装通常较慢的Python代码。我用Numba快速解决了你的问题,并得到了显著的加速。使用IPython的magic timeit魔术函数,custom_convolution函数用了~18秒,而Numba的优化函数用了10.4毫秒,的速度提高了1700。在

下面是Numba是如何实现的。在

import numpy as np
from numba import jit, double

s = 15
array_a = np.random.rand(s ** 3).reshape(s, s, s)
array_b = np.random.rand(s ** 3).reshape(s, s, s)

# Original code
def custom_convolution(A, B):

    dimA = A.shape[0]
    dimB = B.shape[0]
    dimC = dimA + dimB

    C = np.zeros((dimC, dimC, dimC))
    for x1 in range(dimA):
        for x2 in range(dimB):
            for y1 in range(dimA):
                for y2 in range(dimB):
                    for z1 in range(dimA):
                        for z2 in range(dimB):
                            x = x1 + x2
                            y = y1 + y2
                            z = z1 + z2
                            C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
    return C

# Numba'ing the function with the JIT compiler
fast_convolution = jit(double[:, :, :](double[:, :, :],
                        double[:, :, :]))(custom_convolution)

如果你计算两个函数结果之间的残差,你将得到零。这意味着JIT实现可以正常工作。在

^{pr2}$

我得到的输出是0.0。在

您可以将Numba安装到当前的Python设置中,也可以使用来自连续体.io. 在

最后但并非最不重要的是,Numba的功能比scipy.signal.fftconvolve功能快了几倍。在

注意:我用的是Python而不是Python。对于Numba的表现,这两个包有些不同,但我认为不会有太大的差别。在

相关问题 更多 >

    热门问题