Python子进程超时并具有大于64K的输出

2024-05-02 17:44:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我想执行一个进程,将执行时间限制为以秒为单位的超时,并获取该进程产生的输出。我想在windows,linux和freebsd上做这个。

我试着用三种不同的方式来实现这一点:

  1. 没有超时和输出捕获的subprocess.PIPE。

    行为:按预期操作但不支持超时,我需要超时。。。

  2. cmd_to-带超时和用于输出捕获的subprocess.PIPE。

    行为:当输出>;=2^16字节时阻止子进程执行。

  3. cmd_totf-带超时和tempfile.NamedTemporaryfile用于输出捕获。

    行为:按预期操作,但使用磁盘上的临时文件。

下面提供这些以供进一步检查。

从下面的输出中可以看出,使用subprocessing.PIPE时,超时代码会阻止子进程的执行,并且子进程的输出为2^16字节。

子流程文档指出,调用process.wait()和使用subprocessing.PIPE时会出现这种情况,但是在使用process.poll()时不会发出警告,所以这里出了什么问题?

我在cmd_totf中有一个使用tempfile模块的解决方案,但是折衷的办法是它将输出写到磁盘,这是我真正想避免的。

所以我的问题是:

  • 我在命令里做错了什么?
  • 有没有办法做我想做的事而不使用tempfiles/将输出保存在内存中。

生成一组输出(“exp_gen.py”)的脚本:

#!/usr/bin/env python
import sys
output  = "b"*int(sys.argv[1])
print output

子流程周围的包装器有三种不同的实现方式(cmd、cmd、cmd)

#!/usr/bin/env python
import subprocess, time, tempfile
bufsize = -1

def cmd(cmdline, timeout=60):
  """
  Execute cmdline.
  Uses subprocessing and subprocess.PIPE.
  """

  p = subprocess.Popen(
    cmdline,
    bufsize = bufsize,
    shell   = False,
    stdin   = subprocess.PIPE,
    stdout  = subprocess.PIPE,
    stderr  = subprocess.PIPE
  )

  out, err    = p.communicate()
  returncode  = p.returncode

  return (returncode, err, out)

def cmd_to(cmdline, timeout=60):
  """
  Execute cmdline, limit execution time to 'timeout' seconds.
  Uses subprocessing and subprocess.PIPE.
  """

  p = subprocess.Popen(
    cmdline,
    bufsize = bufsize,
    shell   = False,
    stdin   = subprocess.PIPE,
    stdout  = subprocess.PIPE,
    stderr  = subprocess.PIPE
  )

  t_begin         = time.time()             # Monitor execution time
  seconds_passed  = 0  

  while p.poll() is None and seconds_passed < timeout:
    seconds_passed = time.time() - t_begin
    time.sleep(0.1)

  #if seconds_passed > timeout:
  #
  #  try:
  #    p.stdout.close()  # If they are not closed the fds will hang around until
  #    p.stderr.close()  # os.fdlimit is exceeded and cause a nasty exception
  #    p.terminate()     # Important to close the fds prior to terminating the process!
  #                      # NOTE: Are there any other "non-freed" resources?
  #  except:
  #    pass
  #  
  #  raise TimeoutInterrupt

  out, err    = p.communicate()
  returncode  = p.returncode

  return (returncode, err, out)

def cmd_totf(cmdline, timeout=60):
  """
  Execute cmdline, limit execution time to 'timeout' seconds.
  Uses subprocessing and tempfile instead of subprocessing.PIPE.
  """

  output  = tempfile.NamedTemporaryFile(delete=False)
  error   = tempfile.NamedTemporaryFile(delete=False)

  p = subprocess.Popen(
    cmdline,
    bufsize = 0,
    shell   = False,
    stdin   = None,
    stdout  = output,
    stderr  = error
  )

  t_begin         = time.time()             # Monitor execution time
  seconds_passed  = 0  

  while p.poll() is None and seconds_passed < timeout:
    seconds_passed = time.time() - t_begin
    time.sleep(0.1)

  #if seconds_passed > timeout:
  #
  #  try:
  #    p.stdout.close()  # If they are not closed the fds will hang around until
  #    p.stderr.close()  # os.fdlimit is exceeded and cause a nasty exception
  #    p.terminate()     # Important to close the fds prior to terminating the process!
  #                      # NOTE: Are there any other "non-freed" resources?
  #  except:
  #    pass
  #  
  #  raise TimeoutInterrupt

  p.wait()

  returncode  = p.returncode

  fd          = open(output.name)
  out         = fd.read()
  fd.close()

  fd  = open(error.name)
  err = fd.read()
  fd.close()

  error.close()
  output.close()

  return (returncode, err, out)

if __name__ == "__main__":

  implementations = [cmd, cmd_to, cmd_totf]
  bytes     = ['65535', '65536', str(1024*1024)]
  timeouts  = [5]

  for timeout in timeouts:    
    for size in bytes:    
      for i in implementations:
        t_begin         = time.time()
        seconds_passed  = 0        
        rc, err, output = i(['exp_gen.py', size], timeout)
        seconds_passed = time.time() - t_begin
        filler = ' '*(8-len(i.func_name))
        print "[%s%s:  timeout=%d,  iosize=%s,  seconds=%f]" % (repr(i.func_name), filler, timeout, size, seconds_passed)

执行输出:

['cmd'     :  timeout=5,  iosize=65535,  seconds=0.016447]
['cmd_to'  :  timeout=5,  iosize=65535,  seconds=0.103022]
['cmd_totf':  timeout=5,  iosize=65535,  seconds=0.107176]
['cmd'     :  timeout=5,  iosize=65536,  seconds=0.028105]
['cmd_to'  :  timeout=5,  iosize=65536,  seconds=5.116658]
['cmd_totf':  timeout=5,  iosize=65536,  seconds=0.104905]
['cmd'     :  timeout=5,  iosize=1048576,  seconds=0.025964]
['cmd_to'  :  timeout=5,  iosize=1048576,  seconds=5.128062]
['cmd_totf':  timeout=5,  iosize=1048576,  seconds=0.103183]

Tags: andtocmdcloseoutputtimetimeoutsubprocess
2条回答

免责声明:此答案不在windows上测试,也不在freebsd上测试。但是使用的模块应该可以在这些系统上工作。我相信这应该是你问题的有效答案-对我有用。

这是我在linux上破解的代码。这是几个Stackoverflow线程和我自己在Python 3文档中的研究的结合。

本规范的主要特点:

  • 使用进程而不是线程来阻塞I/O,因为它们可以更可靠地被p.terminated()
  • 实现一个可重新触发的超时监视程序,每当发生某些输出时,它就会重新开始计数
  • 实现一个长期超时监视程序以限制整个运行时
  • 可以输入stdin(尽管我只需要输入一次性短字符串)
  • 可以用通常的Popen方式捕获stdout/stderr(只对stdout进行编码,stderr重定向到stdout;但很容易分离)
  • 它几乎是实时的,因为它只每0.2秒检查一次输出。但是你可以减少这个或者很容易地取消等待间隔
  • 许多调试打印输出仍然能够看到发生了什么。

唯一的代码依赖项是enum as implementedhere,但是可以很容易地将代码更改为不使用。它只用于区分两个超时-如果您愿意,请使用单独的异常。

以下是我们非常感谢的代码反馈: (2012年6月29日编辑-代码现在实际工作)

# Python module runcmd
# Implements a class to launch shell commands which
# are killed after a timeout. Timeouts can be reset
# after each line of output
#
# Use inside other script with:
#
# import runcmd
# (return_code, out) = runcmd.RunCmd(['ls', '-l', '/etc'],
#                                    timeout_runtime,
#                                    timeout_no_output,
#                                    stdin_string).go()
#

import multiprocessing
import queue
import subprocess
import time

import enum

def timestamp():
    return time.strftime('%Y%m%d-%H%M%S')


class ErrorRunCmd(Exception): pass
class ErrorRunCmdTimeOut(ErrorRunCmd): pass

class Enqueue_output(multiprocessing.Process):
    def __init__(self, out, queue):
        multiprocessing.Process.__init__(self)
        self.out = out
        self.queue = queue
        self.daemon = True
    def run(self):
        try:
            for line in iter(self.out.readline, b''):
                #print('worker read:', line)
                self.queue.put(line)
        except ValueError: pass # Readline of closed file
        self.out.close()
class Enqueue_input(multiprocessing.Process):
    def __init__(self, inp, iterable):
        multiprocessing.Process.__init__(self)
        self.inp = inp
        self.iterable = iterable
        self.daemon = True
    def run(self):
        #print("writing stdin")
        for line in self.iterable:
            self.inp.write(bytes(line,'utf-8'))
        self.inp.close()
        #print("writing stdin DONE")

class RunCmd():
    """RunCmd - class to launch shell commands

    Captures and returns stdout. Kills child after a given
    amount (timeout_runtime) wallclock seconds. Can also
    kill after timeout_retriggerable wallclock seconds.
    This second timer is reset whenever the child does some
    output

       (return_code, out) = RunCmd(['ls', '-l', '/etc'],
                                   timeout_runtime,
                                   timeout_no_output,
                                   stdin_string).go()

    """
    Timeout = enum.Enum('No','Retriggerable','Runtime')

    def __init__(self, cmd, timeout_runtime, timeout_retriggerable, stdin=None):
        self.dbg = False
        self.cmd = cmd
        self.timeout_retriggerable = timeout_retriggerable
        self.timeout_runtime = timeout_runtime
        self.timeout_hit = self.Timeout.No
        self.stdout = '--Cmd did not yield any output--'
        self.stdin = stdin
    def read_queue(self, q):
        time_last_output = None
        try:
            bstr = q.get(False) # non-blocking
            if self.dbg: print('{} chars read'.format(len(bstr)))
            time_last_output = time.time()
            self.stdout += bstr
        except queue.Empty:
            #print('queue empty')
            pass
        return time_last_output
    def go(self):
        if self.stdin:
            pstdin = subprocess.PIPE
        else:
            pstdin = None
        p = subprocess.Popen(self.cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=pstdin)
        pin = None
        if (pstdin):
            pin = Enqueue_input(p.stdin, [self.stdin + '\n'])
            pin.start()
        q = multiprocessing.Queue()
        pout = Enqueue_output(p.stdout, q)
        pout.start()
        try:
            if self.dbg: print('Beginning subprocess with timeout {}/{} s on {}'.format(self.timeout_retriggerable, self.timeout_runtime, time.asctime()))
            time_begin = time.time()
            time_last_output = time_begin
            seconds_passed = 0
            self.stdout = b''
            once = True                 # ensure loop's executed at least once
                                        # some child cmds may exit very fast, but still produce output
            while once or p.poll() is None or not q.empty():
                once = False
                if self.dbg: print('a) {} of {}/{} secs passed and overall {} chars read'.format(seconds_passed, self.timeout_retriggerable, self.timeout_runtime, len(self.stdout)))

                tlo = self.read_queue(q)
                if tlo:
                    time_last_output = tlo

                now = time.time()
                if now - time_last_output >= self.timeout_retriggerable:
                    self.timeout_hit = self.Timeout.Retriggerable
                    raise ErrorRunCmdTimeOut(self)
                if now - time_begin >= self.timeout_runtime:
                    self.timeout_hit = self.Timeout.Runtime
                    raise ErrorRunCmdTimeOut(self)

                if q.empty():
                    time.sleep(0.1)
            # Final try to get "last-millisecond" output
            self.read_queue(q)            
        finally:
            self._close(p, [pout, pin])            
        return (self.returncode, self.stdout)               

    def _close(self, p, procs):
        if self.dbg:
            if self.timeout_hit != self.Timeout.No:
                print('{} A TIMEOUT occured: {}'.format(timestamp(), self.timeout_hit))
            else:
                print('{} No timeout occured'.format(timestamp()))
        for process in [proc for proc in procs if proc]:
            try:
                process.terminate()
            except:
                print('{} Process termination raised trouble'.format(timestamp()))
                raise
        try:
            p.stdin.close()
        except: pass
        if self.dbg: print('{} _closed stdin'.format(timestamp()))
        try:
            p.stdout.close()    # If they are not closed the fds will hang around until
        except: pass
        if self.dbg: print('{} _closed stdout'.format(timestamp()))
            #p.stderr.close()   # os.fdlimit is exceeded and cause a nasty exception
        try:
            p.terminate()       # Important to close the fds prior to terminating the process!
                                # NOTE: Are there any other "non-freed" resources?
        except: pass
        if self.dbg: print('{} _closed Popen'.format(timestamp()))
        try:
            self.stdout = self.stdout.decode('utf-8')
        except: pass
        self.returncode = p.returncode
        if self.dbg: print('{} _closed all'.format(timestamp()))

用于:

import runcmd

cmd = ['ls', '-l', '/etc']

worker = runcmd.RunCmd(cmd,
                       40,    # limit runtime [wallclock seconds]
                       2,     # limit runtime after last output [wallclk secs]
                       ''     # stdin input string
                       )
(return_code, out) = worker.go()

if worker.timeout_hit != worker.Timeout.No:
    print('A TIMEOUT occured: {}'.format(worker.timeout_hit))
else:
    print('No timeout occured')


print("Running '{:s}' returned {:d} and {:d} chars of output".format(cmd, return_code, len(out)))
print('Output:')
print(out)

第一个参数应该是命令及其参数的列表。它用于Popen(shell=False)调用,其超时以秒为单位。当前没有禁用超时的代码。将timeout_no_output设置为time_runtime可有效禁用可重触发timeout_no_outputstdin_string可以是要发送到命令的标准输入的任何字符串。如果命令不需要任何输入,则设置为None。如果提供了字符串,则会附加最后一个“\n”。

与子流程文档中的所有警告相反,直接读取process.stdout和process.stderr提供了更好的解决方案。

我的意思是,我可以读取超过2^16字节的进程的输出,而不必临时将输出存储在磁盘上。

代码如下:

import fcntl
import os
import subprocess
import time

def nonBlockRead(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ''

def cmd(cmdline, timeout=60):
    """
    Execute cmdline, limit execution time to 'timeout' seconds.
    Uses the subprocess module and subprocess.PIPE.

    Raises TimeoutInterrupt
    """

    p = subprocess.Popen(
        cmdline,
        bufsize = bufsize, # default value of 0 (unbuffered) is best
        shell   = False, # not really needed; it's disabled by default
        stdout  = subprocess.PIPE,
        stderr  = subprocess.PIPE
    )

    t_begin = time.time() # Monitor execution time
    seconds_passed = 0

    stdout = ''
    stderr = ''

    while p.poll() is None and seconds_passed < timeout: # Monitor process
        time.sleep(0.1) # Wait a little
        seconds_passed = time.time() - t_begin

        # p.std* blocks on read(), which messes up the timeout timer.
        # To fix this, we use a nonblocking read()
        # Note: Not sure if this is Windows compatible
        stdout += nonBlockRead(p.stdout)
        stderr += nonBlockRead(p.stderr)

    if seconds_passed >= timeout:
        try:
            p.stdout.close()  # If they are not closed the fds will hang around until
            p.stderr.close()  # os.fdlimit is exceeded and cause a nasty exception
            p.terminate()     # Important to close the fds prior to terminating the process!
                              # NOTE: Are there any other "non-freed" resources?
        except:
            pass

        raise TimeoutInterrupt

    returncode  = p.returncode

    return (returncode, stdout, stderr)

相关问题 更多 >