`多处理池`在独立函数中

2024-10-01 17:36:44 发布

您现在位置:Python中文网/ 问答频道 /正文

因为我的last version没有得到回答,所以我们问了一个更具体的问题。你知道吗

我试图做一个重要的函数,对一系列(长)时间历史的数据帧进行平稳小波变换。实际的理论并不重要(我甚至可能没有完全正确地使用它),重要的是我正在将时间历史分解成块,并使用multiprocessing.Pool将它们提供给多个线程。你知道吗

import pandas as pd
import numpy as np
import pywt
from multiprocessing import Pool
import functools

def swt_block(arr, level = 8, wvlt = 'haar'):
    block_length = arr.shape[0]
    if block_length == 2**level:
        d = pywt.swt(arr, wvlt, axis = 0)
    elif block_length < 2**level:
        arr_ = np.pad(arr, 
                      ((0, 2**level - block_length), (0,0)), 
                      'constant', constant_values = 0)
        d = pywt.swt(arr_, wvlt, axis = 0)
    else:
        raise ValueError('block of length ' + str(arr.shape[0]) + ' too large for swt of level ' + str(level))
    out = []
    for lvl in d:
        for coeff in lvl:
            out.append(coeff)
    return np.concatenate(out, axis = -1)[:block_length]


def swt(df, wvlt = 'haar', level = 8, processors = 4):
    block_length = 2**level
    with Pool(processors) as p:
        data = p.map(functools.partial(swt_block, level = level, wvlt = wvlt), 
                     [i.values for _, i in df.groupby(np.arange(len(df)) // block_length)])
    data = np.concatenate(data, axis = 0) 
    header = pd.MultiIndex.from_product([list(range(level)),
                                     [0, 1],
                                     df.columns], 
                                     names = ['level', 'coef', 'channel'])
    df_out = pd.DataFrame(data, index = df.index, colummns = header)

    return df_out

我以前在一个独立的脚本中做过这件事,所以如果第二个函数只是包装在if __name__ == '__main__':中的裸代码,那么代码就可以工作;如果我在脚本的末尾添加一个类似的块,那么代码就可以在脚本中工作。但是如果我导入或者只是在解释器中运行上面的内容然后

df_swt = swt(df)

事情总是悬不定。我确信这是multiprocessing上的某种防护栏,可以防止我用线程做一些愚蠢的事情,但我真的不想把这段代码复制到一堆其他脚本中。包括其他标签以防他们是凶手。你知道吗


Tags: 代码import脚本dffordatanpout
1条回答
网友
1楼 · 发布于 2024-10-01 17:36:44

首先要明确的是,您正在创建多个进程,而不是线程。如果您对线程特别感兴趣,请将导入更改为:from multiprocessing.dummy import Pool。你知道吗

multiprocessingintroduction

multiprocessing is a package that supports spawning processes using an API similar to the threading module.

multprocessing.dummysection开始:

multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.

现在,我能够重现你的问题(根据你之前的链接问题),事实上同样的事情也发生了。在一个交互式shell上运行的东西只是挂起。你知道吗

然而,有趣的是,通过windowscmd运行时,屏幕上出现了一连串的错误:

RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

因此,作为猜测,我在导入模块中添加了:

if __name__ == "__main__":

和。。。。。。。。。成功了!你知道吗

只是为了澄清疑问,我将张贴在这里的确切文件,我用,所以你可以(希望)重建解决方案。。。你知道吗

multi.py中:

from multiprocessing import Pool

def __foo(x):
    return x**2

def bar(list_of_inputs):
    with Pool() as p:
        out = p.map(__foo, list_of_inputs)
    print(out)

if __name__ == "__main__":
    bar(list(range(50)))

tests.py中:

from multi import bar

l = list(range(50))

if __name__ == "__main__":
    bar(l)

运行这两个文件中的任何一个时的输出(在shell中和通过cmd):

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401]

更新:我在文档中找不到任何具体的证据来说明为什么发生这个问题,但是,很明显,它与创建新流程和importing of the main module有关。你知道吗

正如在这个答案的开头所讨论的,似乎您打算在您的意图中使用线程,而不知道您正在使用进程。如果确实是这样,那么使用实际线程将解决您的问题,并且除了import语句(改为:from multiprocessing.dummy import Pool)之外,不需要更改任何内容。对于线程,无论是在主模块中还是在导入模块中定义if __name__ == "__main__":都没有限制。所以这应该是可行的:

multi.py中:

from multiprocessing.dummy import Pool

def __foo(x):
    return x**2

def bar(list_of_inputs):
    with Pool() as p:
        out = p.map(__foo, list_of_inputs)
    print(out)

if __name__ == "__main__":
    bar(list(range(50)))

tests.py中:

from multi import bar

l = list(range(50))

bar(l)

我真的希望这有助于你解决你的问题,请让我知道如果它做。你知道吗

相关问题 更多 >

    热门问题