Python中的超时功能

2024-10-06 10:22:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在Python(3.x)中有一个函数,它强制脚本本身终止,比如:

i_time_value = 10
mytimeout(i_time_value )   # Terminate the script if not in i_time_value seconds 
for i in range(10):
   print("go")
   time.sleep(2)

其中“mytimeout”是我需要的函数:如果脚本没有终止,它将在“arg”秒内终止脚本。在

我已经看到了给函数herehere设置超时的良好解决方案,但我不希望函数超时,而是脚本超时。在

同时:

  • 我知道我可以将我的脚本放在function中,或者使用类似于subprocess的东西并将其与 暂停,我试过了,但我想要更简单的。
  • 它必须与Unix和Windows兼容。
  • 函数必须是通用的,即:它可以添加到 一行(导入除外)
  • 我需要一个函数,而不是“如何在脚本中设置超时”。

Tags: the函数in脚本forifheretime
3条回答

我会用这样的东西。在

import sys
import time
import threading

def set_timeout(event):
    event.set()

event = threading.Event()
i_time_value = 2

t = threading.Timer(i_time_value, set_timeout, [event])
t.start()

for i in range(10):

    print("go")

    if event.is_set():
        print('Timed Out!')
        sys.exit()

    time.sleep(2)

signal is not Windows compatible.

您可以在Windows上发送一些信号,例如:

os.kill(os.getpid(), signal.CTRL_C_EVENT) # send Ctrl+C to itself

以后可以使用threading.Timer调用函数:

^{pr2}$

其中kill_yourself_now()

import os
import signal
import sys

def kill_yourself_now():
    sig = signal.CTRL_C_EVENT if sys.platform == 'win32' else signal.SIGINT
    os.kill(os.getpid(), sig) # raise KeyboardInterrupt in the main thread

如果脚本启动其他进程,请参阅:how to kill child process(es) when parent dies?另请参阅,How to terminate a python subprocess launched with shell=True它演示了如何终止进程树。在

在谷歌上搜索了一下this answer

import multiprocessing as MP
from sys import exc_info
from time import clock

DEFAULT_TIMEOUT = 60

################################################################################

def timeout(limit=None):
    if limit is None:
        limit = DEFAULT_TIMEOUT
    if limit <= 0:
        raise ValueError()
    def wrapper(function):
        return _Timeout(function, limit)
    return wrapper

class TimeoutError(Exception): pass

################################################################################

def _target(queue, function, *args, **kwargs):
    try:
        queue.put((True, function(*args, **kwargs)))
    except:
        queue.put((False, exc_info()[1]))

class _Timeout:

    def __init__(self, function, limit):
        self.__limit = limit
        self.__function = function
        self.__timeout = clock()
        self.__process = MP.Process()
        self.__queue = MP.Queue()

    def __call__(self, *args, **kwargs):
        self.cancel()
        self.__queue = MP.Queue(1)
        args = (self.__queue, self.__function) + args
        self.__process = MP.Process(target=_target, args=args, kwargs=kwargs)
        self.__process.daemon = True
        self.__process.start()
        self.__timeout = self.__limit + clock()

    def cancel(self):
        if self.__process.is_alive():
            self.__process.terminate()

    @property
    def ready(self):
        if self.__queue.full():
            return True
        elif not self.__queue.empty():
            return True
        elif self.__timeout < clock():
            self.cancel()
        else:
            return False

    @property
    def value(self):
        if self.ready is True:
            flag, load = self.__queue.get()
            if flag:
                return load
            raise load
        raise TimeoutError()

    def __get_limit(self):
        return self.__limit

    def __set_limit(self, value):
        if value <= 0:
            raise ValueError()
        self.__limit = value

    limit = property(__get_limit, __set_limit)

它可能是Python2.x,但转换起来应该不难。在

相关问题 更多 >