为什么terminate()是多处理.pool.Threadpool绞死?

2024-06-26 13:47:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我想用KeyboardInterrupt停止异步多处理作业。但有时呼叫终止时会发生挂起。在

from multiprocessing.pool import ThreadPool
import multiprocessing
import time
import queue
import inspect


def worker(index):
    print('{}: start'.format(index))

    for i in range(5):
        time.sleep(1)

    print('{}: stop'.format(index))
    return index, True


def wrapper(index, stopEvent, qResult):
    if stopEvent.is_set() is True:
        return index, False

    try:
        result = worker(index)
    except:
        print('*' * 50)
        return index, False
    else:
        if result[1] == True:
            qResult.put(result)
    return result


def watcher(qResult, stopEvent):
    cntQ = 0
    while True:
        try:
            result = qResult.get(timeout=10)
            qResult.task_done()
        except queue.Empty:
            if stopEvent.is_set() is True:
                break
        except KeyboardInterrupt:
            stopEvent.set()
        else:
            cntQ += 1
            print(result)
    qResult.join()
    qResult.close()
    print('qResult count:', cntQ)


def main():
    stopEvent = multiprocessing.Event()
    qResult = multiprocessing.JoinableQueue()
    qResult.cancel_join_thread()
    watch = multiprocessing.Process(target=watcher, args=(qResult, stopEvent))
    watch.start()
    pool = ThreadPool()

    lsRet = []
    for i in range(100000):
        try:
            ret = pool.apply_async(wrapper, args=(i, stopEvent, qResult))
            lsRet.append(ret)
        except KeyboardInterrupt:
            stopEvent.set()
            time.sleep(1)
            break
        if i+1 % 10 == 0:
            time.sleep(2)

    cntTotal = len(lsRet)
    cntRet = 0
    for ret in lsRet:
        if stopEvent.is_set():
            break
        try:
            ret.get()
        except KeyboardInterrupt:
            stopEvent.set()
            time.sleep(1)
        else:
            cntRet += 1

    if stopEvent.is_set() is False:
        stopEvent.set()

    print(inspect.stack()[0][1:4])

    if watch.is_alive() is True:
        watch.join()

    print(inspect.stack()[0][1:4])
    pool.terminate()                        # Why hang??????????
    print(inspect.stack()[0][1:4])
    pool.join()

    print(cntTotal, cntRet)


if __name__ == '__main__':
    main()

main()使用multiprocessing.pool.Threadpool异步调用watcher()线程和多个{}线程。在

wrapper()调用worker(),并将其结果放入队列。在

watcher()监视上面的结果队列。在

如果按下ctrl-c,则设置stopEvent。在

设置stopEvent时,wrapper()停止调用worker()Watcher()表示{}和{},并退出循环。在

最后main()调用池的terminate()。在

有时进程做得很好,但有时挂起。每次都不一样。在


Tags: importtrueindexiftimeismainresult