这个问题可能与在Why is vectorized numpy code slower than for loops?回答的问题有相同的味道
我想实现一个sigma裁剪方法来拒绝一系列图像中的一些异常值。下面是我得到的,首先是一些生成带有异常值的随机数据立方体(我通常有4k x 4k的真实图像)的代码
生成一些随机数据的代码:
import numpy as np
def make_outliers(mean=1000, err=10, size=50):
# Create 1 stack of pixel with 10% outliers.
std = np.sqrt(mean)
n_outliers = np.random.randint(0, int(size/10))
size -= n_outliers
random_err = err * np.random.normal(loc=mean, scale=std, size=size) * np.random.choice((-1, 1), size)
data = mean + random_err
outlier_int = 50 * mean
outlier_errs = outlier_int * np.random.rand(n_outliers) * np.random.choice((-1, 1), n_outliers)
data = np.concatenate((data, outlier_errs))
np.random.shuffle(data)
return data
def cube_outliers(size=(128, 128, 50)):
# Make an image series with outliers with respect to the 3rd dimension
cube = np.empty(size)
for r in range(size[0]):
for c in range(size[1]):
cube[r,c,:] = make_outliers(size=size[-1])
return cube
Sigma裁剪函数,将被拒绝的像素作为布尔掩码返回:
def sigma_clip(datacube):
rejMask = np.zeros(datacube.shape, dtype=np.bool)
n = 1
n_outliers = []
while n > 0:
rejMask0 = rejMask.copy()
med = np.nanmedian(datacube, axis=-1)
sigma = np.nanstd(datacube, axis=-1)
rejMask = (np.abs(datacube - med[...,np.newaxis]) > 5*sigma[...,np.newaxis])
n = rejMask.sum()
n_outliers.append(n)
# print(n)
rejMask = rejMask0 | rejMask
datacube[rejMask] = np.nan
return rejMask, n_outliers
# Generate data cube for testing, emulating a series of 100 images.
images = cube_outliers(size=(1024, 1024, 100))
矢量化版本的计时:
%timeit rej_mask1, n_outs = sigma_clip(images.copy())
print(n_outs, sum(n_outs))
24.2 s ± 114 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[22213, 36, 0] 22249
在所有行上使用for循环计时(实际上是列表理解)
%%timeit
images2 = images.copy()
nout_rows = []
rej_mask2 = np.empty(images.shape)
for r in range(images.shape[0]):
rej_mask2[r, ...], nout_r = sigma_clip(images2[r,...])
nout_rows.append(nout_r)
10.6 s ± 60.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
nloops = max([len(nouts) for nouts in nout_rows])
nout_sum = sum([sum(nouts) for nouts in nout_rows])
print(nout_max, nout_sum)
np.array_equal(rej_mask1, rej_mask2)
(3, 22249)
True
所以这里的矢量化代码比我使用一个for循环逐行运行sigma裁剪过滤器时慢了两倍多。该示例在每个维度中使用的图像大小至少是真实场景中图像大小的一半。这就是我在矢量化代码中所做的事情,它不是Numpy优化的,还有更好的方法使用Numpy吗?或者,这个问题类似于上面提到的与CPU相关的缓存/L1的问题。在后一种情况下,我希望Numpy为我们优化它,因为硬件方面的考虑更多的是我会明确地使用各种编译器选项,而不是我希望Numpy让我想到的东西
如果不使用Cython或Nuba进行并行化,在坚持使用Python/Numpy的同时,有没有更快的方法来实现这一点?我有约24 GB的可分配内存与机器在这个时间使用
[Update]在这个版本之前,我尝试过使用屏蔽数组,但它明显比直接用np.nan
和nanmedian()
和nanstd()
标记坏值慢
[Update 2]基于这些评论,为了明确两次测试之间被拒绝的像素数和关于第三维的while循环总数没有差异,我在上面的代码中添加了一个计数器(n_outliers
),跟踪被剪裁的每个像素堆栈在while循环中的迭代次数。它只是有助于断言,如果在两个测试之间不重新运行随机数据生成(否则,您只会得到具有不同大小的新异常值集的不同数据),那么对于被拒绝的像素,代码的行为是相同的。此外,我检查np.array_equal(rej_mask1, rej_mask2)
是否确实是True
[Update 3]使用Update 2中添加的输出,我们确实可以在第二次测试中确定处理量远小于第一次测试:
nloops_per_row = np.array([len(nouts) for nouts in nout_rows])
from collections import Counter
Counter(nloops_per_row)
Counter({2: 988, 3: 36})
因此,在测试1(矢量化版本)中,只要第二次测试(for循环)中至少有1个异常值,整个数组就会被发送到while循环3倍。在第二次测试(for循环)中,只有那些在第二次测试结束时仍然有异常值的行才能获得第三次测试,在这里,相对于测试1中的1024行,只有36个异常值
[更新4]-解决方案1
这可能并不优雅;这是一个矢量化的版本,在坚持Numpy的同时速度要快得多。(i) 它在2D中重塑立方体,(ii)它将剪切的第一个过程与其他过程分开,因此我们不需要在第一个过程中使用nanmedian()
和nanstd()
,因为它们比median()
和std()
慢,(iii)在其他过程中,它只加载和处理具有异常值的行,而不处理所有的行,不管它们的内容如何
def sigma_clip2(datacube):
# Make a 1st pass while there's no need to consider NaN-flagged arrays
# as median() and std() are faster than nanmedian() and nanstd().
sz = datacube.shape
flatc = datacube.reshape([sz[0]*sz[1], sz[2]])
rej_mask = np.zeros(flatc.shape, dtype=np.bool)
m = np.median(flatc, axis=1)
sigma = np.std(flatc, axis=1)
mask = np.abs(flatc - m[:,np.newaxis]) > 5*sigma[:,np.newaxis]
n = mask.sum()
if n == 0:
return rej_mask
# Prepare new passes only on the rows that have outliers.
clip_rows = np.where(np.any(mask, axis=1))[0]
mask = mask[clip_rows, :]
rej_mask[clip_rows, :] = mask
while n > 0:
# Work only on the rows that have outliers. Flag them as NaN
flatc = flatc[clip_rows, :]
flatc[mask] = np.nan
m = np.nanmedian(flatc, axis=1)
sigma = np.nanstd(flatc, axis=1)
mask = np.abs(flatc - m[:,np.newaxis]) > 5*sigma[:,np.newaxis]
clip_rows0 = clip_rows.copy()
clip_rows = np.where(np.any(mask, axis=1))[0]
mask = mask[clip_rows, :]
n = mask.sum()
rej_mask[clip_rows0[clip_rows]] = rej_mask[clip_rows0[clip_rows]] | mask
rej_mask = rej_mask.reshape(sz)
return rej_mask
对于相同的随机生成的数据立方体,3个不同版本的计时给出:
慢速矢量化(第1版):24 s ± 120 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
for循环(版本2):12 s ± 51.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
更快的矢量化(第3版):3.82 s ± 46.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
在后一种情况下,基本上是第1遍最长,大约在3s
。第二次传递的顺序是~200ms
,因为只有几十行包含异常值。也许第1关可以从另一个优化中获益反倾销
[Update 5]-解决方案1中的错误及其修复
执行以下操作时,输出拒绝掩码rej_mask
中的索引未正确映射:clip_rows0 = clip_rows.copy()
。如果我们只有两个过程,这是可行的,但是如果while循环第二次迭代,这是错误的(随机数据生成器进行了1或2次迭代,所以这是一个误导性的巧合,所有解的输出都是相等的)。更改为clip_rows0 = clip_rows0[clip_rows]
将提供预期的映射。这对上述性能比较没有影响。解决方案1[固定]如下
def sigma_clip2(datacube):
# Make a 1st pass while there's no need to consider NaN-flagged arrays
# as median() and std() are faster than nanmedian() and nanstd().
sz = datacube.shape
flatc = datacube.reshape([sz[0]*sz[1], sz[2]])
rej_mask = np.zeros(flatc.shape, dtype=np.bool)
m = np.median(flatc, axis=1)
sigma = np.std(flatc, axis=1)
mask = np.abs(flatc - m[:,np.newaxis]) > 5*sigma[:,np.newaxis]
n = mask.sum()
if n == 0:
return rej_mask
# Prepare new passes only on the rows that have outliers.
clip_rows = np.where(np.any(mask, axis=1))[0]
clip_rows0 = clip_rows.copy()
mask = mask[clip_rows, :]
rej_mask[clip_rows, :] = mask
while n > 0:
# Work only on the rows that have outliers. Flag them as NaN
flatc = flatc[clip_rows, :]
flatc[mask] = np.nan
m = np.nanmedian(flatc, axis=1)
sigma = np.nanstd(flatc, axis=1)
mask = np.abs(flatc - m[:,np.newaxis]) > 5*sigma[:,np.newaxis]
clip_rows = np.where(np.any(mask, axis=1))[0]
mask = mask[clip_rows, :]
n = mask.sum()
# Map back to original indices of the original array
clip_rows0 = clip_rows0[clip_rows]
rej_mask[clip_rows0] = rej_mask[clip_rows0] | mask
rej_mask = rej_mask.reshape(sz)
return rej_mask
目前没有回答
相关问题 更多 >
编程相关推荐