如何优化Python中的嵌套for循环

2024-10-01 11:41:35 发布

您现在位置:Python中文网/ 问答频道 /正文

所以我试图编写一个python函数来返回一个称为mielkeberry R值的度量。指标的计算如下: enter image description here

我现在写的代码还可以,但是由于公式中的和,我唯一能想到的解决方法就是在Python中使用一个嵌套的for循环,这非常慢。。。在

以下是我的代码:

def mb_r(forecasted_array, observed_array):
    """Returns the Mielke-Berry R value."""
    assert len(observed_array) == len(forecasted_array)
    y = forecasted_array.tolist()
    x = observed_array.tolist()
    total = 0
    for i in range(len(y)):
        for j in range(len(y)):
            total = total + abs(y[j] - x[i])
    total = np.array([total])
    return 1 - (mae(forecasted_array, observed_array) * forecasted_array.size ** 2 / total[0])

我将输入数组转换为list的原因是我听说(还没有测试)使用python for循环索引numpy数组的速度非常慢。在

我觉得可能有某种新的函数可以更快地解决这个问题,有人知道吗?在


Tags: 函数代码inforlen度量range数组
3条回答

纽比广播

如果不受内存限制,优化numpy中嵌套循环的第一步是使用广播并以矢量化的方式执行操作:

import numpy as np    

def mb_r(forecasted_array, observed_array):
        """Returns the Mielke-Berry R value."""
        assert len(observed_array) == len(forecasted_array)
        total = np.abs(forecasted_array[:, np.newaxis] - observed_array).sum() # Broadcasting
        return 1 - (mae(forecasted_array, observed_array) * forecasted_array.size ** 2 / total[0])

但在这种情况下,循环是用C而不是Python进行的,它涉及到一个大小(N,N)数组的分配。在

广播不是灵丹妙药,试着打开内环

如上所述,广播意味着巨大的内存开销。所以它应该小心使用,而且它并不总是正确的方法。虽然你的第一印象可能是在任何地方使用它-不要。不久前我也被这个事实搞糊涂了,看我的问题Numpy ufuncs speed vs for loop speed。为了不太冗长,我将在您的示例中显示这一点:

^{pr2}$

小型阵列(广播更快)

forecasted = np.random.rand(100)
observed = np.random.rand(100)

%timeit mb_r_bcast(forecasted, observed)
57.5 µs ± 359 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit mb_r_unroll(forecasted, observed)
1.17 ms ± 2.53 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

中型阵列(相等)

forecasted = np.random.rand(1000)
observed = np.random.rand(1000)

%timeit mb_r_bcast(forecasted, observed)
15.6 ms ± 208 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit mb_r_unroll(forecasted, observed)
16.4 ms ± 13.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

大尺寸阵列(广播速度较慢)

forecasted = np.random.rand(10000)
observed = np.random.rand(10000)

%timeit mb_r_bcast(forecasted, observed)
1.51 s ± 18 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit mb_r_unroll(forecasted, observed)
377 ms ± 994 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

正如您可以看到的,对于小型阵列,广播版本比展开的快20倍,对于中型阵列,它们是而不是相等,但对于大型阵列,它的速度4倍,因为内存开销要付出高昂的代价。在

Numba jit与并行化

另一种方法是使用numba及其强大的@jit函数修饰符。在这种情况下,只需对初始代码稍作修改。另外,为了使循环并行,您应该将range更改为prange,并提供parallel=True关键字参数。在下面的代码片段中,我使用了与@jit(nopython=True)相同的@njit修饰符:

from numba import njit, prange

@njit(parallel=True)
def mb_r_njit(forecasted_array, observed_array):
    """Returns the Mielke-Berry R value."""
    assert len(observed_array) == len(forecasted_array)
    total = 0.
    size = len(forecasted_array)
    for i in prange(size):
        observed = observed_array[i]
        for j in prange(size):
            total += abs(forecasted_array[j] - observed)
    return 1 - (mae(forecasted_array, observed_array) * size ** 2 / total)

您没有提供mae函数,但是要在njit模式下运行代码,还必须修饰mae函数,或者如果它是一个数字,则将其作为一个参数传递给jitted函数。在

其他选项

Python的科学生态系统是巨大的,我只是提到了其他一些可以加速的选项:CythonNuitkaPythranbottleneck等等。也许你对gpu computing感兴趣,但这实际上是另一个故事。在

时间安排

不幸的是,在我的电脑上,时间安排是:

import numpy as np
import numexpr as ne

forecasted_array = np.random.rand(10000)
observed_array   = np.random.rand(10000)

初始版本

%timeit mb_r(forecasted_array, observed_array)
23.4 s ± 430 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

numexpr

%%timeit
forecasted_array2d = forecasted_array[:, np.newaxis]
ne.evaluate('sum(abs(forecasted_array2d - observed_array))')[()]
784 ms ± 11.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

广播版

%timeit mb_r_bcast(forecasted, observed)
1.47 s ± 4.13 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

内部循环展开版本

%timeit mb_r_unroll(forecasted, observed)
389 ms ± 11.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

numbanjit(parallel=True)版本

%timeit mb_r_njit(forecasted_array, observed_array)
32 ms ± 4.05 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

可以看出,njit方法比初始解决方案快730倍,也比numexpr解决方案快24.5倍(也许你需要英特尔的矢量数学库来加速)。同样,与初始版本相比,内部循环展开的简单方法可以使60x加速。我的规格是:

Intel(R)Core(TM)2四CPU Q9550 2.83GHz
Python 3.6.3
数字1.13.3
数字0.36.1
numexpr 2.6.4

最后说明

很奇怪,这是一个非常缓慢的测试

arr = np.arange(1000)
ls = arr.tolistist()

%timeit for i in arr: pass
69.5 µs ± 282 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

%timeit for i in ls: pass
13.3 µs ± 81.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit for i in range(len(arr)): arr[i]
167 µs ± 997 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

%timeit for i in range(len(ls)): ls[i]
90.8 µs ± 1.07 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

结果证明你是对的。迭代列表的速度2-5x。当然,这些结果必须带有一定的讽刺意味:)

这里有一种矢量化的方法来利用^{}得到{}-

np.abs(forecasted_array[:,None] - observed_array).sum()

为了接受相同的列表和数组,我们可以使用NumPy内建函数进行外部减法,如下-

^{pr2}$

我们还可以利用^{} module来进行更快的absolute计算,并在一个单独的numexpr evaluate调用中执行summation-reductions,这样可以大大提高内存效率,比如-

import numexpr as ne

forecasted_array2D = forecasted_array[:,None]
total = ne.evaluate('sum(abs(forecasted_array2D - observed_array))')

以下代码作为参考:

#pythran export mb_r(float64[], float64[])
import numpy as np

def mb_r(forecasted_array, observed_array):
    return np.abs(forecasted_array[:,None] - observed_array).sum()

在纯CPython上以以下速度运行:

^{pr2}$

当用Pythran编译时,我得到

% pythran -march=native -DUSE_BOOST_SIMD mbr.py
% python -m perf timeit -s 'import numpy as np; x = np.random.rand(400); y = np.random.rand(400); from mbr import mb_r' 'mb_r(x, y)'
.....................
Mean +- std dev: 65.8 us +- 1.7 us

所以大概是一个x10加速,在一个带有AVX扩展的核心上。在

相关问题 更多 >