基于Keras预测的Python多处理

2024-06-25 05:44:15 发布

您现在位置:Python中文网/ 问答频道 /正文

上下文

Keras模型(link here,为了MWE)需要并行地预测大量的测试数据。在

我将多维数据集定义为uint的3Dnumpy.ndarray。它的每个垂直切片是一个,即npixels=128高度,nbins=128深度。在

每次预测都会转换去噪列中的一列(大小相同)。在

我提供了三种方法:单线程、多处理和pathos包多处理。这两种多线程方法都不起作用,我不明白原因。在

代码

import keras
import numpy as np
import threading
import pathos.multiprocessing
import multiprocessing


def __res_sum_squares(y_true, y_pred):
    squared_diff = (y_true - y_pred) ** 2
    return keras.backend.sum(squared_diff)


__npixels, __nbins = 128, 128
__shape_col = (__npixels, __nbins)
__shape_nn = (1, __npixels, __nbins, 1)
__model = keras.models.load_model('./model.h5', compile=True, custom_objects={'res_sum_squares': __res_sum_squares})

__max_parallel_predictions = 4
__sema = threading.BoundedSemaphore(value=__max_parallel_predictions)


def __mt_pathos_manager(col_ratio):
    return __denoise(col_ratio[0], col_ratio[1])


def __denoise_frame_mt_pathos(frame_ratios):
    results = pathos.multiprocessing.ProcessingPool().map(__mt_pathos_manager, frame_ratios)
    return results


def __denoise_frame_mt_multiprocessing(frame_ratios):
    pool = multiprocessing.Pool()
    results = pool.map(__denoise, map(lambda col_ratio: col_ratio, frame_ratios))
    pool.close()
    pool.join()
    return results


def __denoise(col, ratio=None):
    """
        :param col: the source column
        :param ratio: logging purposes
        :return: the denoised column
    """
    really_predict = True
    if type(col) is tuple:
        col, ratio = col[0], col[1]
    col_denoise = np.reshape(col, __shape_nn)

    print("{} acquiring".format(ratio))
    __sema.acquire()
    print("{} acquired".format(ratio))
    #  ~    ~  ~  ~  ~  ~  ~  ~  ~  ~ CRITICAL SECTION START ~  ~  ~  ~  ~  ~  ~  ~  ~  ~
    col_denoise = __model.predict(col_denoise) if really_predict else col_denoise
    #  ~    ~  ~  ~  ~  ~  ~  ~  ~  ~ CRITICAL SECTION END   ~  ~  ~  ~  ~  ~  ~  ~  ~  ~
    print("{} releasing".format(ratio))
    __sema.release()
    print("{} released".format(ratio))

    return np.reshape(col_denoise, __shape_col)


def denoise_cube(cube, mp=False, mp_pathos=False):
    """
        :param cube: a numpy 3D array of ncols * npixels * nbins
        :param mp: use multiprocessing
        :param mp_pathos: use pathos multiprocessing
        :return: the denoised cube
    """
    ncol = cube.shape[0]
    ratios = [(ic * 100.0) / ncol for ic in range(0, ncol)]
    frame_ratios = zip(cube, ratios)

    if mp:
        if mp_pathos:
            l_cols_denoised = __denoise_frame_mt_pathos(frame_ratios)
        else:
            l_cols_denoised = __denoise_frame_mt_multiprocessing(frame_ratios)
    else:
        l_cols_denoised = [__denoise(col, ratio) for col, ratio in frame_ratios]
    return l_cols_denoised


if __name__ == "__main__":

    test_cube = np.random.rand(1000, __npixels, __nbins)

    # Single threaded impl: works fine
    denoise_cube(test_cube, mp=False)
    # Multiprocessing Pool: blocks at the eighth "acquired" print
    denoise_cube(test_cube, mp=True, mp_pathos=False)
    # Pathos multiprocessing Pool: blocks at the eighth "acquired" print
    denoise_cube(test_cube, mp=True, mp_pathos=True)

分析

我猜测的第一件事是,在8次调用(即测试机器上的cpu核数)之后,对__model.predict()的抢购是阻塞的。 所以我放置了一个访问量小于8的threading.BoundedSemaphore。什么都不管用。在

单螺纹工作正常:

^{pr2}$

多处理(两个版本)没有

0.0 acquiring
0.0 acquired
3.2 acquiring
3.2 acquired
6.4 acquiring
6.4 acquired
9.6 acquiring
9.6 acquired
12.8 acquiring
12.8 acquired
16.0 acquiring
16.0 acquired
19.2 acquiring
19.2 acquired
22.4 acquiring
22.4 acquired
< hangs >

等等,release指纹在哪里?似乎信号量没有被触摸,或者每次调用都被复制,并且总是重新初始化。嗯

所以让我们寻找really_predict = True并交换它的值:predict()调用永远不会以这种方式到达。在

。。。。这很好,很好!!!所以问题并不是完全可以解决multiprocessing,而是keras预测和{}池之间的一个奇怪的联系。有什么建议吗?在


Tags: returndefcolmpmultiprocessingframecuberatio