有什么办法用吗多处理.pool在嵌套函数或模块中?

2024-06-01 08:32:53 发布

您现在位置:Python中文网/ 问答频道 /正文

谢谢你看这个。我承认我已经在python中尝试了一个星期的并行处理,所以如果我错过了一个明显的解决方案,我很抱歉。我有一段代码,我想运行多功能池(). 那些位于main.py文件中的代码运行良好,但是当我试图将它们添加到模块中的函数中时,它们都没有得到任何输出。该应用程序只运行过它并继续运行。我在想这可能与post有关,但它没有给出任何关于替代方法来完成我需要的东西的想法。一个简单示例中的代码如下:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('{0}% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()


def veggie():
    print 'carrot'
    status = True
    return status

results = []
pool = mp.Pool(thread_count)
for x in range(10):
    pool.apply_async(veggie, callback=multiproc_log_result)
pool.close()
pool.join()

但这并不管用:

^{pr2}$

最后,我希望这个例子不起作用,因为它活在一个单独的模块中的另一个函数中。所以当我导入模块packngo并将其用作packngo.basic_packngo(inputs)并将nest函数的内容放在它将运行的某个位置。任何帮助都将不胜感激。:D我是一个非常简单的人,所以如果你能像对孩子那样解释的话,我可能会记不住了!在


Tags: 模块函数代码loglendefstatusmp
2条回答

我认为问题是在multiproc_log_result的范围内,变量{}不存在。 因此,您应该做的是将异步调用的结果直接附加到结果中。 但是您将无法跟踪进度(我想没有办法在类之外直接共享回调函数的全局变量)

from multiprocessing.pool import ThreadPool

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)

    def veggie():
        print 'carrot'
        status = True
        return status

    results = []
    pool = ThreadPool(thread_count)
    for x in range(10):
        results.append(pool.apply_async(veggie))

    pool.close()
    pool.join()

    results = [result.get() for result in results]  # get value from async result

    ...then do stuff with results

您链接的另一个问题有解决方案,只是没有详细说明:您不能使用嵌套函数作为apply*/*map*方法家族的apply*/*map*参数。它们适用于multiprocessing.dummy.Pool,因为multiprocessing.dummy由线程支持,线程可以直接传递函数引用,但是multiprocessing.Pool必须对函数进行pickle,并且只有具有可导入名称的函数才能被pickle。如果您检查一个嵌套函数的名称,它类似于modulename.outerfuncname.<locals>.innerfuncname<locals>组件使其无法导入(这通常是件好事;利用嵌套的嵌套函数通常在闭包范围内具有临界状态,而仅仅导入就会丢失这种状态)。在

以嵌套方式定义callback函数是非常好的,因为它们是在父进程中执行的,所以它们不会被发送到工作线程。在您的例子中,只有回调依赖于闭包作用域,所以将funcveggie)移出到全局范围,将packngo模块定义为:

def veggie():
    print 'carrot'
    status = True
    return status

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('{0}% done'.format(100 * len(results) / 10))

    results = []
    pool = mp.Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

是的,这意味着veggie成为相关模块的公共成员。如果您想指出它应该被视为实现细节,那么可以在它前面加下划线(_veggie),但是它必须是全局的,才能与multiprocessing.Pool一起使用。在

相关问题 更多 >