在NumPy中高效地计算给定向量元素的所有成对乘积

2024-10-19 16:26:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在寻找一种“最佳”方法来计算给定向量元素的所有两两乘积。如果向量的大小为N,则输出将是一个大小为N * (N + 1) // 2的向量,并包含所有(i, j)对的i <= j值。简单的计算方法如下:

import numpy as np

def get_pairwise_products_naive(vec: np.ndarray):
    k, size = 0, vec.size
    output = np.empty(size * (size + 1) // 2)
    for i in range(size):
        for j in range(i, size):
            output[k] = vec[i] * vec[j]
            k += 1
    return output

德西德拉塔:

  • 最小化额外内存分配/使用:如果可能,直接写入输出缓冲区
  • 使用矢量化NumPy例程而不是显式循环
  • 避免额外的(不必要的)计算

我一直在使用outertriu_indiceseinsum等例程以及一些索引/视图技巧,但未能找到符合上述要求的解决方案


Tags: 方法inimportnumpy元素foroutputsize
3条回答

您还可以将此算法并行化。如果可以只分配一次足够大的阵列(该阵列上较小的视图几乎不需要任何成本),然后覆盖它,则可以实现更大的加速

示例

@numba.njit(parallel=True)
def pairwise_products_numba_2_with_allocation(vec):
    k, size = 0, vec.size
    k_vec=np.empty(vec.size,dtype=np.int64)
    output = np.empty(size * (size + 1) // 2)

    #precalculate the indices
    for i in range(size):
        k_vec[i] = k
        k+=(size-i)

    for i in numba.prange(size):
        k=k_vec[i]
        for j in range(size-i):
            output[k+j] = vec[i] * vec[j+i]

    return output

@numba.njit(parallel=True)
def pairwise_products_numba_2_without_allocation(vec,output):
    k, size = 0, vec.size
    k_vec=np.empty(vec.size,dtype=np.int64)

    #precalculate the indices
    for i in range(size):
        k_vec[i] = k
        k+=(size-i)

    for i in numba.prange(size):
        k=k_vec[i]
        for j in range(size-i):
            output[k+j] = vec[i] * vec[j+i]

    return output

计时

A=np.arange(5000)
k, size = 0, A.size
output = np.empty(size * (size + 1) // 2)

%timeit res_1=pairwise_products_numba_2_without_allocation(A,output)
#7.84 ms ± 116 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_2=pairwise_products_numba_2_with_allocation(A)
#16.9 ms ± 325 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_3=pairwise_products_numba(A) #@orlp
#43.3 ms ± 134 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

我可能会计算M=vTv,然后展平该矩阵的较低或较高三角形部分

def pairwise_products(v: np.ndarray):
    assert len(v.shape) == 1
    n = v.shape[0]
    m = v.reshape(n, 1) @ v.reshape(1, n)
    return m[np.tril_indices_from(m)].ravel()

我还想提到^{},这将使您的“幼稚”方法很可能比此方法更快

import numba

@numba.njit
def pairwise_products_numba(vec: np.ndarray):
    k, size = 0, vec.size
    output = np.empty(size * (size + 1) // 2)
    for i in range(size):
        for j in range(i, size):
            output[k] = vec[i] * vec[j]
            k += 1
    return output

仅测试上述pairwise_products(np.arange(5000))需要约0.3秒,而numba版本需要约0.05秒(忽略用于及时编译函数的第一次运行)

方法#1

对于带有NumPy的向量化的一个,您可以在获得所有带有外部乘法的两两乘法后使用掩蔽一个,如下所示-

def pairwise_multiply_masking(a):
    return (a[:,None]*a)[~np.tri(len(a),k=-1,dtype=bool)]

方法#2

对于真正大的输入1D数组,我们可能需要使用迭代slicing方法,该方法使用一个循环-

def pairwise_multiply_iterative_slicing(a):
    n = len(a)
    N = (n*(n+1))//2
    out = np.empty(N, dtype=a.dtype)
    c = np.r_[0,np.arange(n,0,-1)].cumsum()
    for ii,(i,j) in enumerate(zip(c[:-1],c[1:])):
        out[i:j] = a[ii:]*a[ii]
    return out

基准测试

我们将在设置中包括^{} and ^{} from @orlp's solution

使用^{}包(打包在一起的一些基准测试工具;免责声明:我是它的作者)对建议的解决方案进行基准测试

import benchit
funcs = [pairwise_multiply_masking, pairwise_multiply_iterative_slicing, pairwise_products_numba, pairwise_products]
in_ = [np.random.rand(n) for n in [10,50,100,200,500,1000,5000]]
t = benchit.timings(funcs, in_)
t.plot(logx=True, save='timings.png')
t.speedups(-1).plot(logx=True, logy=False, save='speedups.png')

结果(超过pairwise_products的计时和加速)——

enter image description here

enter image description here

从绘图趋势可以看出,对于真正大型的阵列,基于切片的阵列将开始获胜,否则矢量化阵列将表现出色

建议

  • 我们还可以研究numexpr,以便对大型数组更有效地执行外部乘法

相关问题 更多 >