多处理池如果返回所需结果,如何取消所有正在运行的进程?

2024-09-24 22:29:38 发布

您现在位置:Python中文网/ 问答频道 /正文

给定以下Python代码:

import multiprocessing

def unique(somelist):
    return len(set(somelist)) == len(somelist)


if __name__ == '__main__':
    somelist = [[1,2,3,4,5,6,7,8,9,10,11,12,13,2], [1,2,3,4,5], [1,2,3,4,5,6,7,8,9,1], [0,1,5,1]]

    pool = multiprocessing.Pool()
    reslist = pool.map(unique, somelist)
    pool.close()
    pool.join()
    print "Done!"

    print reslist

现在想象一下,这个玩具示例中带有整数的列表非常长,我想在这里实现以下目标:如果somelist中的一个列表返回True,则杀死所有正在运行的进程

这就引出了两个问题(可能还有更多我没有想到的问题):

  • 当其他进程正在运行时,如何从已完成的进程“读取”或“侦听”结果?例如,如果一个进程正在处理来自somelist[1,2,3,4,5],并且在所有其他进程之前完成,那么我如何在此时读取该进程的结果

  • 假设在其他进程运行时可以“读取”已完成进程的结果:如何将此结果作为终止所有其他运行进程的条件?

    e、 g.如果一个进程已完成并返回True,我如何将此作为终止所有其他(仍在运行的)进程的条件


Tags: 代码importtrue列表lenreturn进程def
2条回答

如果没有奇特的IPC(进程间通信)技巧,最简单的方法是使用带有回调函数的Pool方法。回调在主程序中运行(在由multiprocessing创建的线程中),并在每个结果可用时使用它们。当回调看到您喜欢的结果时,它可以终止Pool。比如说,

import multiprocessing as mp

def worker(i):
    from time import sleep
    sleep(i)
    return i, (i == 5)

def callback(t):
    i, quit = t
    result[i] = quit
    if quit:
        pool.terminate()

if __name__ == "__main__":
    N = 50
    pool = mp.Pool()
    result = [None] * N
    for i in range(N):
        pool.apply_async(func=worker, args=(i,), callback=callback)
    pool.close()
    pool.join()
    print(result)

这几乎肯定会显示以下内容(操作系统调度异常可能允许另一个或两个输入被消耗):

[False, False, False, False, False, True, None, None, None, None,
 None, None, None, None, None, None, None, None, None, None,
 None, None, None, None, None, None, None, None, None, None,
 None, None, None, None, None, None, None, None, None, None,
 None, None, None, None, None, None, None, None, None, None]

使用pool.imap_unordered以任意顺序查看结果

reslist = pool.imap_unordered(unique, somelist)
pool.close()
for res in reslist:
    if res:  # or set other condition here
        pool.terminate()
        break
pool.join()

当池进程仍在生成结果时,可以在主进程中的imapreslist上迭代

相关问题 更多 >