<p>免责声明:此答案不在windows上测试,也不在freebsd上测试。但是使用的模块应该可以在这些系统上工作。我相信这应该是你问题的有效答案-对我有用。</p>
<p>这是我在linux上破解的代码。这是几个Stackoverflow线程和我自己在Python 3文档中的研究的结合。</p>
<p>本规范的主要特点:</p>
<ul>
<li>使用进程而不是线程来阻塞I/O,因为它们可以更可靠地被p.terminated()</li>
<li>实现一个可重新触发的超时监视程序,每当发生某些输出时,它就会重新开始计数</li>
<li>实现一个长期超时监视程序以限制整个运行时</li>
<li>可以输入stdin(尽管我只需要输入一次性短字符串)</li>
<li>可以用通常的Popen方式捕获stdout/stderr(只对stdout进行编码,stderr重定向到stdout;但很容易分离)</li>
<li>它几乎是实时的,因为它只每0.2秒检查一次输出。但是你可以减少这个或者很容易地取消等待间隔</li>
<li>许多调试打印输出仍然能够看到发生了什么。</li>
</ul>
<p>唯一的代码依赖项是enum as implemented<a href="https://stackoverflow.com/questions/36932/whats-the-best-way-to-implement-an-enum-in-python/1753340#1753340">here</a>,但是可以很容易地将代码更改为不使用。它只用于区分两个超时-如果您愿意,请使用单独的异常。</p>
<p>以下是我们非常感谢的代码反馈:
(2012年6月29日编辑-代码现在实际工作)</p>
<pre><code># Python module runcmd
# Implements a class to launch shell commands which
# are killed after a timeout. Timeouts can be reset
# after each line of output
#
# Use inside other script with:
#
# import runcmd
# (return_code, out) = runcmd.RunCmd(['ls', '-l', '/etc'],
# timeout_runtime,
# timeout_no_output,
# stdin_string).go()
#
import multiprocessing
import queue
import subprocess
import time
import enum
def timestamp():
return time.strftime('%Y%m%d-%H%M%S')
class ErrorRunCmd(Exception): pass
class ErrorRunCmdTimeOut(ErrorRunCmd): pass
class Enqueue_output(multiprocessing.Process):
def __init__(self, out, queue):
multiprocessing.Process.__init__(self)
self.out = out
self.queue = queue
self.daemon = True
def run(self):
try:
for line in iter(self.out.readline, b''):
#print('worker read:', line)
self.queue.put(line)
except ValueError: pass # Readline of closed file
self.out.close()
class Enqueue_input(multiprocessing.Process):
def __init__(self, inp, iterable):
multiprocessing.Process.__init__(self)
self.inp = inp
self.iterable = iterable
self.daemon = True
def run(self):
#print("writing stdin")
for line in self.iterable:
self.inp.write(bytes(line,'utf-8'))
self.inp.close()
#print("writing stdin DONE")
class RunCmd():
"""RunCmd - class to launch shell commands
Captures and returns stdout. Kills child after a given
amount (timeout_runtime) wallclock seconds. Can also
kill after timeout_retriggerable wallclock seconds.
This second timer is reset whenever the child does some
output
(return_code, out) = RunCmd(['ls', '-l', '/etc'],
timeout_runtime,
timeout_no_output,
stdin_string).go()
"""
Timeout = enum.Enum('No','Retriggerable','Runtime')
def __init__(self, cmd, timeout_runtime, timeout_retriggerable, stdin=None):
self.dbg = False
self.cmd = cmd
self.timeout_retriggerable = timeout_retriggerable
self.timeout_runtime = timeout_runtime
self.timeout_hit = self.Timeout.No
self.stdout = '--Cmd did not yield any output--'
self.stdin = stdin
def read_queue(self, q):
time_last_output = None
try:
bstr = q.get(False) # non-blocking
if self.dbg: print('{} chars read'.format(len(bstr)))
time_last_output = time.time()
self.stdout += bstr
except queue.Empty:
#print('queue empty')
pass
return time_last_output
def go(self):
if self.stdin:
pstdin = subprocess.PIPE
else:
pstdin = None
p = subprocess.Popen(self.cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=pstdin)
pin = None
if (pstdin):
pin = Enqueue_input(p.stdin, [self.stdin + '\n'])
pin.start()
q = multiprocessing.Queue()
pout = Enqueue_output(p.stdout, q)
pout.start()
try:
if self.dbg: print('Beginning subprocess with timeout {}/{} s on {}'.format(self.timeout_retriggerable, self.timeout_runtime, time.asctime()))
time_begin = time.time()
time_last_output = time_begin
seconds_passed = 0
self.stdout = b''
once = True # ensure loop's executed at least once
# some child cmds may exit very fast, but still produce output
while once or p.poll() is None or not q.empty():
once = False
if self.dbg: print('a) {} of {}/{} secs passed and overall {} chars read'.format(seconds_passed, self.timeout_retriggerable, self.timeout_runtime, len(self.stdout)))
tlo = self.read_queue(q)
if tlo:
time_last_output = tlo
now = time.time()
if now - time_last_output >= self.timeout_retriggerable:
self.timeout_hit = self.Timeout.Retriggerable
raise ErrorRunCmdTimeOut(self)
if now - time_begin >= self.timeout_runtime:
self.timeout_hit = self.Timeout.Runtime
raise ErrorRunCmdTimeOut(self)
if q.empty():
time.sleep(0.1)
# Final try to get "last-millisecond" output
self.read_queue(q)
finally:
self._close(p, [pout, pin])
return (self.returncode, self.stdout)
def _close(self, p, procs):
if self.dbg:
if self.timeout_hit != self.Timeout.No:
print('{} A TIMEOUT occured: {}'.format(timestamp(), self.timeout_hit))
else:
print('{} No timeout occured'.format(timestamp()))
for process in [proc for proc in procs if proc]:
try:
process.terminate()
except:
print('{} Process termination raised trouble'.format(timestamp()))
raise
try:
p.stdin.close()
except: pass
if self.dbg: print('{} _closed stdin'.format(timestamp()))
try:
p.stdout.close() # If they are not closed the fds will hang around until
except: pass
if self.dbg: print('{} _closed stdout'.format(timestamp()))
#p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception
try:
p.terminate() # Important to close the fds prior to terminating the process!
# NOTE: Are there any other "non-freed" resources?
except: pass
if self.dbg: print('{} _closed Popen'.format(timestamp()))
try:
self.stdout = self.stdout.decode('utf-8')
except: pass
self.returncode = p.returncode
if self.dbg: print('{} _closed all'.format(timestamp()))
</code></pre>
<p>用于:</p>
<pre><code>import runcmd
cmd = ['ls', '-l', '/etc']
worker = runcmd.RunCmd(cmd,
40, # limit runtime [wallclock seconds]
2, # limit runtime after last output [wallclk secs]
'' # stdin input string
)
(return_code, out) = worker.go()
if worker.timeout_hit != worker.Timeout.No:
print('A TIMEOUT occured: {}'.format(worker.timeout_hit))
else:
print('No timeout occured')
print("Running '{:s}' returned {:d} and {:d} chars of output".format(cmd, return_code, len(out)))
print('Output:')
print(out)
</code></pre>
<p>第一个参数应该是命令及其参数的列表。它用于<code>Popen(shell=False)</code>调用,其超时以秒为单位。当前没有禁用超时的代码。将<code>timeout_no_output</code>设置为<code>time_runtime</code>可有效禁用可重触发<code>timeout_no_output</code>。
<code>stdin_string</code>可以是要发送到命令的标准输入的任何字符串。如果命令不需要任何输入,则设置为<code>None</code>。如果提供了字符串,则会附加最后一个“\n”。</p>