一旦Python的一个工作线程满足某个条件,就终止它的多处理程序

2024-05-17 04:35:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用它的多处理模块编写一个Python程序。该程序调用许多辅助函数,每个函数都产生一个随机数。一旦其中一个工人产生大于0.7的数字,我就需要终止程序。

下面是我的程序,其中“如何执行此操作”部分尚未填写。知道吗?谢谢。

import time
import numpy as np
import multiprocessing as mp
import time
import sys

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    print "From i = ",i, "       res = ",res
    if res>0.7:
        print "find it"
        # terminate  ???? Question: How to do this???


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()

Tags: 模块函数import程序iftimeasnp
3条回答

有一种更干净的python方法来完成您想做的事情,它是通过使用multiprocessing.Pool提供的回调函数来实现的。

您可以检查this question以查看实现示例。

任何进程都无法阻止类似于大锤的暴力。别去那儿。

要明智地做到这一点,您需要重新设计您的基本方法:主流程和工作流程需要相互通信。

我想充实一下,但到目前为止的例子是没有什么用处。例如,如前所述,对rand()的调用不超过num_workers,因此没有理由相信其中任何调用必须是>;0.7。

一旦worker函数增长了一个循环,它就变得更加明显。例如,工作者可以检查是否在循环的顶部设置了mp.Event,如果设置了,则退出。主进程将在希望工人停止时设置Event

当工人发现一个值>;0.7时,他可以设置一个不同的mp.Event。主进程将等待Event,然后设置“停止时间”Event供工作人员查看,然后执行常规的循环.join()-让工作人员进行干净的关闭。

编辑

假设工人们将继续工作,直到至少有人找到一个值>;0.7,这里将充实出一个便携、干净的解决方案。请注意,我从中删除了numpy,因为它与此代码无关。这里的代码在任何支持multiprocessing的平台上的任何股票Python下都应该可以正常工作:

import random
from time import sleep

def worker(i, quit, foundit):
    print "%d started" % i
    while not quit.is_set():
        x = random.random()
        if x > 0.7:
            print '%d found %g' % (i, x)
            foundit.set()
            break
        sleep(0.1)
    print "%d is done" % i

if __name__ == "__main__":
    import multiprocessing as mp
    quit = mp.Event()
    foundit = mp.Event()
    for i in range(mp.cpu_count()):
        p = mp.Process(target=worker, args=(i, quit, foundit))
        p.start()
    foundit.wait()
    quit.set()

以及一些示例输出:

0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done

一切都会干净地关闭:没有回溯,没有异常终止,没有留下僵尸进程。。。干净利落。

杀了它

正如@nosdafox所指出的,有一个Pool.terminate()方法尽其所能,跨平台杀死工作进程,不管它们在做什么(例如,在Windows上,它调用平台TerminateProcess())。我不建议在生产代码中使用它,因为突然终止进程可能会使各种共享资源处于不一致的状态,或者让它们泄漏。在multiprocessing文档中有各种关于这个的警告,您应该在其中添加操作系统文档。

不过,这可能是权宜之计!这是一个使用这种方法的完整程序。请注意,我将截止值提高到了0.95,使运行此操作比运行一个眼球链接更可能需要更长的时间:

import random
from time import sleep

def worker(i):
    print "%d started" % i
    while True:
        x = random.random()
        print '%d found %g' % (i, x)
        if x > 0.95:
            return x # triggers callback
        sleep(0.5)

# callback running only in __main__
def quit(arg):
    print "quitting with %g" % arg
    # note: p is visible because it's global in __main__
    p.terminate()  # kill all pool workers

if __name__ == "__main__":
    import multiprocessing as mp
    ncpu = mp.cpu_count()
    p = mp.Pool(ncpu)
    for i in range(ncpu):
        p.apply_async(worker, args=(i,), callback=quit)
    p.close()
    p.join()

以及一些示例输出:

$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]

正如其他一个用户所提到的,您需要这些进程相互通信,以便让它们终止其对等方。虽然可以使用os.kill终止对等进程,但发出终止信号更为方便。

我使用的解决方案非常简单: 一。找出主进程的进程ID(pid),它生成所有其他工作进程。此连接信息可从OS获得,OS跟踪从哪个父进程派生的子进程。 2。当其中一个工作进程达到您的结束条件时,它使用父进程ID查找主进程(包括它自己)的所有子进程,然后遍历列表并向它们发送结束信号(确保它没有向自己发送信号) 下面的代码包含工作解决方案。

import time
import numpy as np
import multiprocessing as mp
import time
import sys
import os
import psutil
import signal

pid_array = []

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    current_process = os.getpid()
    print "From i = ",i, "       res = ",res, " with process ID (pid) = ", current_process
    if res>0.7:
        print "find it"
        # solution: use the parent child connection between processes
        parent = psutil.Process(main_process)
        children = parent.children(recursive=True)
        for process in children:
            if not (process.pid == current_process):
                print "Process: ",current_process,  " killed process: ", process.pid
                process.send_signal(signal.SIGTERM)


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    main_process = os.getpid()
    print "Main process: ", main_process
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()

输出清楚地说明了正在发生的事情:

Main process:  30249
From i =  0        res =  0.224609517693  with process ID (pid) =  30259
From i =  1        res =  0.470935062176  with process ID (pid) =  30260
From i =  2        res =  0.493680214732  with process ID (pid) =  30261
From i =  3        res =  0.342349294134  with process ID (pid) =  30262
From i =  4        res =  0.149124648092  with process ID (pid) =  30263
From i =  5        res =  0.0134122107375  with process ID (pid) =  30264
From i =  6        res =  0.719062852901  with process ID (pid) =  30265
find it
From i =  7        res =  0.663682945388  with process ID (pid) =  30266
Process:  30265  killed process:  30259
Process:  30265  killed process:  30260
Process:  30265  killed process:  30261
Process:  30265  killed process:  30262
Process:  30265  killed process:  30263
Process:  30265  killed process:  30264
Process:  30265  killed process:  30266

相关问题 更多 >