提高代码效率:滑动窗口上的标准差

2024-04-27 21:49:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试改进函数,该函数为图像的每个像素计算位于像素附近的像素的标准偏差。我的函数使用两个嵌入的循环来遍历矩阵,这是我的程序的瓶颈。我想可能有一种方法可以通过去掉numpy的循环来改进它,但是我不知道如何继续。 欢迎任何建议!

问候

def sliding_std_dev(image_original,radius=5) :
    height, width = image_original.shape
    result = np.zeros_like(image_original) # initialize the output matrix
    hgt = range(radius,height-radius)
    wdt = range(radius,width-radius)
    for i in hgt:
        for j in wdt:
            result[i,j] = np.std(image_original[i-radius:i+radius,j-radius:j+radius])
    return result

Tags: 函数inimagefornprange像素result
3条回答

首先,有不止一种方法可以做到这一点。

这不是最有效的速度方面,但是使用^{}将允许您轻松地在移动窗口上应用任意python函数。

举个简单的例子:

result = scipy.ndimage.generic_filter(data, np.std, size=2*radius)

注意,边界条件可以由modekwarg控制。


另一种方法是使用一些不同的跨步技巧来创建有效地作为移动窗口的数组视图,然后沿最后一个轴应用np.std。(注意:这是从我以前的一个答案中得到的:https://stackoverflow.com/a/4947453/325565

def strided_sliding_std_dev(data, radius=5):
    windowed = rolling_window(data, (2*radius, 2*radius))
    shape = windowed.shape
    windowed = windowed.reshape(shape[0], shape[1], -1)
    return windowed.std(axis=-1)

def rolling_window(a, window):
    """Takes a numpy array *a* and a sequence of (or single) *window* lengths
    and returns a view of *a* that represents a moving window."""
    if not hasattr(window, '__iter__'):
        return rolling_window_lastaxis(a, window)
    for i, win in enumerate(window):
        if win > 1:
            a = a.swapaxes(i, -1)
            a = rolling_window_lastaxis(a, win)
            a = a.swapaxes(-2, i)
    return a

def rolling_window_lastaxis(a, window):
    """Directly taken from Erik Rigtorp's post to numpy-discussion.
    <http://www.mail-archive.com/numpy-discussion@scipy.org/msg29450.html>"""
    if window < 1:
       raise ValueError, "`window` must be at least 1."
    if window > a.shape[-1]:
       raise ValueError, "`window` is too long."
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

乍一看有点难以理解这里发生了什么。我不想插入我自己的答案,但我不想重新键入解释,所以看看这里:https://stackoverflow.com/a/4924433/325565如果你以前没有看到过这种“跨步”技巧。

如果我们将计时与一个100x100随机浮点数数组(其radius为5)进行比较,它比原始版本或generic_filter版本快约10倍。但是,使用此版本在边界条件方面没有灵活性。(它与您当前所做的工作完全相同,而generic_filter版本以牺牲速度为代价,为您提供了很多灵活性。)

# Your original function with nested loops
In [21]: %timeit sliding_std_dev(data)
1 loops, best of 3: 237 ms per loop

# Using scipy.ndimage.generic_filter
In [22]: %timeit ndimage_std_dev(data)
1 loops, best of 3: 244 ms per loop

# The "stride-tricks" version above
In [23]: %timeit strided_sliding_std_dev(data)
100 loops, best of 3: 15.4 ms per loop

# Ophion's version that uses `np.take`
In [24]: %timeit new_std_dev(data)
100 loops, best of 3: 19.3 ms per loop

“跨步技巧”版本的缺点是,与“普通”跨步滚动窗口技巧不同,此版本的会复制一个,它比原始数组大得多。如果在一个大数组中使用它,您将遇到内存问题!(另一方面,在内存使用和速度方面,它基本上等同于@Ophion的答案。这只是做同样事情的另一种方法。)

酷把戏:你可以计算标准差,只要在窗口中给出平方值和值的和。

因此,您可以使用数据的统一过滤器快速计算标准偏差:

from scipy.ndimage.filters import uniform_filter

def window_stdev(arr, radius):
    c1 = uniform_filter(arr, radius*2, mode='constant', origin=-radius)
    c2 = uniform_filter(arr*arr, radius*2, mode='constant', origin=-radius)
    return ((c2 - c1*c1)**.5)[:-radius*2+1,:-radius*2+1]

这比原始函数快得多。对于1024x1024数组和半径为20的数组,旧函数需要34.11秒,而新函数需要0.11秒,速度提高了300倍。


这在数学上是如何工作的?它计算每个窗口的数量sqrt(mean(x^2) - mean(x)^2)。我们可以从标准差sqrt(mean((x - mean(x))^2))中导出这个量,如下所示:

E为期望算子(基本上是mean()),设X为数据的随机变量。然后:

E[(X - E[X])^2]
= E[X^2 - 2X*E[X] + E[X]^2]
= E[X^2] - E[2X*E[X]] + E[E[X]^2](通过期望算符的线性)
= E[X^2] - 2E[X]*E[X] + E[X]^2(同样是线性关系,而且E[X]是一个常数)
= E[X^2] - E[X]^2

这证明了用这种方法计算的量在数学上等价于标准差。

在图像处理中,最常用的方法是使用求和面积表,这是1984年this paper中引入的一种思想。其思想是,当你通过在一个窗口上加上来计算一个数量,并将窗口(例如,向右移动一个像素)时,你不需要添加新窗口中的所有项,你只需要从总数中减去最左边的列,然后添加新的最右边的列。因此,如果从数组中在两个维度上创建一个累加和数组,则可以在一个窗口上通过两个和和和一个减法得到和。如果为数组及其平方保留面积求和表,那么很容易从这两个表中得到方差。下面是一个实现:

def windowed_sum(a, win):
    table = np.cumsum(np.cumsum(a, axis=0), axis=1)
    win_sum = np.empty(tuple(np.subtract(a.shape, win-1)))
    win_sum[0,0] = table[win-1, win-1]
    win_sum[0, 1:] = table[win-1, win:] - table[win-1, :-win]
    win_sum[1:, 0] = table[win:, win-1] - table[:-win, win-1]
    win_sum[1:, 1:] = (table[win:, win:] + table[:-win, :-win] -
                       table[win:, :-win] - table[:-win, win:])
    return win_sum

def windowed_var(a, win):
    win_a = windowed_sum(a, win)
    win_a2 = windowed_sum(a*a, win)
    return (win_a2 - win_a * win_a / win/ win) / win / win

要确保这一点:

>>> a = np.arange(25).reshape(5,5)
>>> windowed_var(a, 3)
array([[ 17.33333333,  17.33333333,  17.33333333],
       [ 17.33333333,  17.33333333,  17.33333333],
       [ 17.33333333,  17.33333333,  17.33333333]])
>>> np.var(a[:3, :3])
17.333333333333332
>>> np.var(a[-3:, -3:])
17.333333333333332

这应该比基于卷积的方法快一些。

相关问题 更多 >