一定时间后中断功能

2024-09-30 08:29:06 发布

您现在位置:Python中文网/ 问答频道 /正文

在python中,以玩具为例:

for x in range(0, 3):
    # call function A(x)

如果函数A花费的时间超过5秒,我想通过跳过它来继续for循环,这样我就不会陷入困境或浪费时间。

通过一些搜索,我意识到子流程或线程可能有帮助,但我不知道如何在这里实现。 任何帮助都会很好的。 谢谢


Tags: 函数infor时间rangefunction流程call
3条回答

评论是正确的,你应该检查里面。这是一个潜在的解决方案。请注意,异步函数(例如使用线程)与此解决方案不同。这是同步的,这意味着它仍将以串行方式运行。

import time

for x in range(0,3):
    someFunction()

def someFunction():
    start = time.time()
    while (time.time() - start < 5):
        # do your normal function

    return;

如果你能把你的工作分解开来,经常检查,那几乎总是最好的解决办法。但有时这是不可能的,例如,也许你正在从一个慢文件共享中读取一个文件,每隔一段时间它就会挂起30秒。要在内部处理这个问题,必须围绕异步I/O循环重新构造整个程序。

如果不需要跨平台,可以在*nix(包括Mac和Linux)上使用信号,在Windows上使用APCs,等等,但是如果需要跨平台,那就不行。

所以,如果你真的需要同时做,你可以,有时你必须。在这种情况下,您可能希望为此使用进程,而不是线程。你不能真正安全地杀死一个线程,但是你可以杀死一个进程,它可以是你想要的那样安全。另外,如果线程由于受CPU限制而占用了5秒以上的时间,那么您不想在GIL上与它抗争。

这里有两个基本选项。


首先,您可以将代码放在另一个脚本中,并使用subprocess运行它:

subprocess.check_call([sys.executable, 'other_script.py', arg, other_arg],
                      timeout=5)

由于这是通过正常的子进程通道进行的,因此您只能使用一些argv字符串、成功/失败返回值(实际上是一个小整数,但这并不是更好的选择)和可选的大量文本进入和大量文本出来。


或者,可以使用multiprocessing生成类似线程的子进程:

p = multiprocessing.Process(func, args)
p.start()
p.join(5)
if p.is_alive():
    p.terminate()

如您所见,这有点复杂,但在以下几个方面更好:

  • 您可以传递任意的Python对象(至少可以传递任何可以pickle的对象),而不仅仅是字符串。
  • 不必将目标代码放在完全独立的脚本中,您可以将其作为函数放在同一脚本中。
  • 它更加灵活,例如,如果您以后需要传递进度更新,那么很容易在任一方向或两个方向添加队列。

任何一种并行性的最大问题都是共享可变数据——例如,让后台任务作为其工作的一部分更新全局字典(您的评论说您正在尝试这样做)。有了线程,您可以摆脱它,但竞争条件可能导致损坏的数据,所以您必须非常小心锁定。有了子进程,你根本无法逃脱。(是的,正如Sharing state between processes所解释的,您可以使用共享内存,但这仅限于简单的类型,如数字、固定数组和您知道如何定义为C结构的类型,它只会让您回到与线程相同的问题。)


理想情况下,您可以安排一些事情,这样在进程运行时就不需要共享任何数据,您可以传入一个dict作为参数,然后得到一个dict作为结果。这通常很容易安排,当你有一个以前的同步功能,你想放在后台。

但是,如果,比方说,部分结果比没有结果好呢?在这种情况下,最简单的解决方案是通过队列传递结果。如Exchanging objects between processes中所述,您可以使用显式队列来实现这一点,但有一种更简单的方法。

如果您可以将整个流程分解成单独的任务,每个任务对应一个要保存在字典中的值(或一组值),那么您可以将它们安排在Pool-或者更好的是,安排在^{}上。(如果您使用的是Python 2.x或3.1,请参见PyPI上的backport^{}。)

假设你的慢函数是这样的:

def spam():
    global d
    for meat in get_all_meats():
        count = get_meat_count(meat)
        d.setdefault(meat, 0) += count

相反,你应该这样做:

def spam_one(meat):
    count = get_meat_count(meat)
    return meat, count

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(spam_one, get_canned_meats(), timeout=5)
    for (meat, count) in results:
        d.setdefault(meat, 0) += count

在5秒内得到的结果会被添加到dict中;如果不是所有的结果,则会放弃其余的结果,并引发一个TimeoutError(您可以根据需要处理日志,执行一些快速回退代码,无论怎样)。

如果任务真的是独立的(就像我愚蠢的小例子中那样,但是它们可能不在你的真实代码中,至少在没有重大重新设计的情况下),你可以通过删除tmax_workers=1。然后,如果你在一台8核机器上运行它,它将解雇8名工人,并给他们每八分之一的工作量,事情会做得更快。(通常不是8倍的速度,但通常是3-6倍的速度,这仍然是相当不错的。)

我认为创建一个新的过程可能是过火了。如果您在Mac或基于Unix的系统上,应该能够使用signal.SIGALRM强制超时花费太长时间的函数。这将适用于因网络或其他问题而空闲的函数,这些问题是通过修改函数绝对无法处理的。我有一个在这个答案中使用它的例子:

https://stackoverflow.com/a/24921763/3803152

在这里编辑我的答案,尽管我不确定我是否应该这么做:

import signal

class TimeoutException(Exception):   # Custom exception class
    pass

def timeout_handler(signum, frame):   # Custom signal handler
    raise TimeoutException

# Change the behavior of SIGALRM
signal.signal(signal.SIGALRM, timeout_handler)

for i in range(3):
    # Start the timer. Once 5 seconds are over, a SIGALRM signal is sent.
    signal.alarm(5)    
    # This try/except loop ensures that 
    #   you'll catch TimeoutException when it's sent.
    try:
        A(i) # Whatever your function that might hang
    except TimeoutException:
        continue # continue the for loop if function A takes more than 5 second
    else:
        # Reset the alarm
        signal.alarm(0)

这基本上设置了一个5秒的计时器,然后尝试执行代码。如果在时间耗尽之前无法完成,则发送一个SIGALRM,我们捕获它并将其转换为TimeoutException。这将迫使您进入except块,在这里您的程序可以继续。

编辑:哇哦,TimeoutException是一个类,不是函数。谢谢,阿伯内特!

相关问题 更多 >

    热门问题