对ndarray的每一行应用一个函数

2024-10-02 02:34:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我有这个函数来计算向量x的平方马氏距离,意思是:

def mahalanobis_sqdist(x, mean, Sigma):
   '''
    Calculates squared Mahalanobis Distance of vector x 
    to distibutions' mean 
   '''
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = x - mean
   sqmdist = np.dot(np.dot(xdiff, Sigma_inv), xdiff)
   return sqmdist

我有一个numpy数组,它的形状是(25, 4)。所以,我想将该函数应用到数组的所有25行,而不使用for循环。所以,基本上,我如何写出这个循环的矢量形式:

for r in d1:
    mahalanobis_sqdist(r[0:4], mean1, Sig1)

其中mean1Sig1是:

>>> mean1
array([ 5.028,  3.48 ,  1.46 ,  0.248])
>>> Sig1 = np.cov(d1[0:25, 0:4].T)
>>> Sig1
array([[ 0.16043333,  0.11808333,  0.02408333,  0.01943333],
       [ 0.11808333,  0.13583333,  0.00625   ,  0.02225   ],
       [ 0.02408333,  0.00625   ,  0.03916667,  0.00658333],
       [ 0.01943333,  0.02225   ,  0.00658333,  0.01093333]])

我试过以下方法,但没有成功:

>>> vecdist = np.vectorize(mahalanobis_sqdist)
>>> vecdist(d1, mean1, Sig1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/dist-packages/numpy/lib/function_base.py", line 1862, in __call__
    theout = self.thefunc(*newargs)
  File "<stdin>", line 6, in mahalanobis_sqdist
  File "/usr/lib/python2.7/dist-packages/numpy/linalg/linalg.py", line 445, in inv
    return wrap(solve(a, identity(a.shape[0], dtype=a.dtype)))
IndexError: tuple index out of range

Tags: innumpynplinemeansigmafiled1
3条回答

要将函数应用于数组的每一行,可以使用:

np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)    

不过,在这种情况下,还有更好的办法。不必对每一行应用函数。相反,您可以对整个d1数组应用NumPy操作来计算相同的结果。np.einsum可以替换for-loop和对np.dot的两个调用:


def mahalanobis_sqdist2(d, mean, Sigma):
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = d - mean
   return np.einsum('ij,im,mj->i', xdiff, xdiff, Sigma_inv)

以下是一些基准:

import numpy as np
np.random.seed(1)

def mahalanobis_sqdist(x, mean, Sigma):
   '''
   Calculates squared Mahalanobis Distance of vector x 
   to distibutions mean 
   '''
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = x - mean
   sqmdist = np.dot(np.dot(xdiff, Sigma_inv), xdiff)
   return sqmdist

def mahalanobis_sqdist2(d, mean, Sigma):
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = d - mean
   return np.einsum('ij,im,mj->i', xdiff, xdiff, Sigma_inv)

def using_loop(d1, mean, Sigma):
    expected = []
    for r in d1:
        expected.append(mahalanobis_sqdist(r[0:4], mean1, Sig1))
    return np.array(expected)

d1 = np.random.random((25,4))
mean1 = np.array([ 5.028,  3.48 ,  1.46 ,  0.248])
Sig1 = np.cov(d1[0:25, 0:4].T)

expected = using_loop(d1, mean1, Sig1)
result = np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)
result2 = mahalanobis_sqdist2(d1, mean1, Sig1)
assert np.allclose(expected, result)
assert np.allclose(expected, result2)

In [92]: %timeit mahalanobis_sqdist2(d1, mean1, Sig1)
10000 loops, best of 3: 31.1 µs per loop
In [94]: %timeit using_loop(d1, mean1, Sig1)
1000 loops, best of 3: 569 µs per loop
In [91]: %timeit np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)
1000 loops, best of 3: 806 µs per loop

因此mahalanobis_sqdist2for-loop快18倍,比使用np.apply_along_axis快26倍。


注意,np.apply_along_axisnp.vectorizenp.frompyfunc是Python实用程序函数。在引擎盖下他们使用for-while-loops。这里没有真正的“矢量化”。它们可以提供语法帮助,但不要期望它们能使代码的性能比您自己编写的代码更好。

刚刚在reddit上看到了一个非常好的评论,它可能会让事情更快一点:

This is not surprising to anyone who uses numpy regularly. For loops in python are horribly slow. Actually, einsum is pretty slow too. Here's a version that is faster if you have lots of vectors (500 vectors in 4 dimensions is enough to make this version faster than einsum on my machine):

def no_einsum(d, mean, Sigma):
    L_inv = np.linalg.inv(numpy.linalg.cholesky(Sigma))
    xdiff = d - mean
    return np.sum(np.dot(xdiff, L_inv.T)**2, axis=1)

If your points are also high dimensional then computing the inverse is slow (and generally a bad idea anyway) and you can save time by solving the system directly (500 vectors in 250 dimensions is enough to make this version the fastest on my machine):

def no_einsum_solve(d, mean, Sigma):
    L = numpy.linalg.cholesky(Sigma)
    xdiff = d - mean
    return np.sum(np.linalg.solve(L, xdiff.T)**2, axis=0)

@unutbu的答案对于将任何函数应用于数组的行非常有效。 在这种特殊情况下,如果使用大型数组,可以使用一些数学对称性来显著加快速度。

以下是函数的修改版本:

def mahalanobis_sqdist3(x, mean, Sigma):
    Sigma_inv = np.linalg.inv(Sigma)
    xdiff = x - mean
    return (xdiff.dot(Sigma_inv)*xdiff).sum(axis=-1)

如果您最终使用任何类型的大型Sigma,我建议您缓存Sigma_inv,并将其作为参数传递给函数。 因为在本例中是4x4,所以这无关紧要。 我将展示如何处理大型的Sigma不管是谁遇到这个。

如果您不打算重复使用同一个Sigma,您将无法缓存它,因此,您可以使用不同的方法来求解线性系统,而不是反转矩阵。 在这里,我将使用内置到SciPy中的LU分解。 这只会在x的列数相对于其行数较大时提高时间。

下面是一个函数,它显示了这种方法:

from scipy.linalg import lu_factor, lu_solve
def mahalanobis_sqdist4(x, mean, Sigma):
    xdiff = x - mean
    Sigma_inv = lu_factor(Sigma)
    return (xdiff.T*lu_solve(Sigma_inv, xdiff.T)).sum(axis=0)

这里有一些时间安排。 我将把另一个答案中提到的带有einsum的版本包括在内。

import numpy as np
Sig1 = np.array([[ 0.16043333,  0.11808333,  0.02408333,  0.01943333],
                 [ 0.11808333,  0.13583333,  0.00625   ,  0.02225   ],
                 [ 0.02408333,  0.00625   ,  0.03916667,  0.00658333],
                 [ 0.01943333,  0.02225   ,  0.00658333,  0.01093333]])
mean1 = np.array([ 5.028,  3.48 ,  1.46 ,  0.248])
x = np.random.rand(25, 4)
%timeit np.apply_along_axis(mahalanobis_sqdist, 1, x, mean1, Sig1)
%timeit mahalanobis_sqdist2(x, mean1, Sig1)
%timeit mahalanobis_sqdist3(x, mean1, Sig1)
%timeit mahalanobis_sqdist4(x, mean1, Sig1)

给予:

1000 loops, best of 3: 973 µs per loop
10000 loops, best of 3: 36.2 µs per loop
10000 loops, best of 3: 40.8 µs per loop
10000 loops, best of 3: 83.2 µs per loop

但是,更改所涉及阵列的大小会更改计时结果。 例如,让x = np.random.rand(2500, 4),计时如下:

10 loops, best of 3: 95 ms per loop
1000 loops, best of 3: 355 µs per loop
10000 loops, best of 3: 131 µs per loop
1000 loops, best of 3: 337 µs per loop

x = np.random.rand(1000, 1000)Sigma1 = np.random.rand(1000, 1000)mean1 = np.random.rand(1000)计时如下:

1 loops, best of 3: 1min 24s per loop
1 loops, best of 3: 2.39 s per loop
10 loops, best of 3: 155 ms per loop
10 loops, best of 3: 99.9 ms per loop

编辑:我注意到其他答案之一使用了Cholesky分解。 假设Sigma是对称的正定的,我们实际上可以做得比上面的结果更好。 通过SciPy,BLAS和LAPACK提供了一些很好的例程,可以处理对称正定矩阵。 这里有两个更快的版本。

from scipy.linalg.fblas import dsymm
def mahalanobis_sqdist5(x, mean, Sigma_inv):
    xdiff = x - mean
    Sigma_inv = la.inv(Sigma)
    return np.einsum('...i,...i->...',dsymm(1., Sigma_inv, xdiff.T).T, xdiff)
from scipy.linalg.flapack import dposv
def mahalanobis_sqdist6(x, mean, Sigma):
    xdiff = x - mean
    return np.einsum('...i,...i->...', xdiff, dposv(Sigma, xdiff.T)[1].T)

第一个仍然颠倒西格玛。 如果预先计算并重用逆运算,则速度会快得多(在我的机器上,使用预先计算的逆运算,1000x1000的情况需要35.6ms)。 我还使用了einsum来获取产品,然后沿着最后一个轴求和。 结果,这比做(A * B).sum(axis=-1)之类的事情快得多。 这两个功能提供以下计时:

第一个测试用例:

10000 loops, best of 3: 55.3 µs per loop
100000 loops, best of 3: 14.2 µs per loop

第二个测试用例:

10000 loops, best of 3: 121 µs per loop
10000 loops, best of 3: 79 µs per loop

第三个测试用例:

10 loops, best of 3: 92.5 ms per loop
10 loops, best of 3: 48.2 ms per loop

相关问题 更多 >

    热门问题