同时监视子进程的stdout和stderr

2024-05-05 12:20:44 发布

您现在位置:Python中文网/ 问答频道 /正文

如何同时监视长时间运行的子进程的标准输出和标准错误,并在子进程生成每一行时立即对其进行处理?在

我不介意使用Python3.6的异步工具在两个流中的每一个上实现我所期望的无阻塞异步循环,但这似乎并不能解决问题。以下代码:

import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime


async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in p.stdout:
        print(datetime.now(), f.decode().strip())
    async for f in p.stderr:
        print(datetime.now(), "E:", f.decode().strip())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run('''
         echo "Out 1";
         sleep 1;
         echo "Err 1" >&2;
         sleep 1;
         echo "Out 2"
    '''))
    loop.close()

输出:

^{pr2}$

我希望它能输出类似于:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2

Tags: runfromimportechocmdloopasyncio标准
2条回答

要实现这一点,您需要一个函数,该函数将获取两个异步序列,并将它们合并到一起,在它们可用时从其中一个或另一个生成结果。{cd1>中的stock可以是这样的函数:

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in merge(p.stdout, p.stderr):
        print(datetime.now(), f.decode().strip())

merge这样的函数在标准库中还不存在,但是aiostream外部库provides one。您也可以使用异步生成器和asyncio.wait()编写自己的程序:

^{pr2}$

上面的run仍然与您期望的输出有一个细节上的不同:它不会区分输出和错误行。但这很容易通过在线条上装饰一个指示器来实现:

async def decorate_with(it, prefix):
    async for item in it:
        yield prefix, item

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for is_out, line in merge(decorate_with(p.stdout, True),
                                    decorate_with(p.stderr, False)):
        if is_out:
            print(datetime.now(), line.decode().strip())
        else:
            print(datetime.now(), "E:", line.decode().strip())

在我看来,实际上有一个更简单的解决方案来解决这个问题,至少如果监视代码不需要在一个协程调用中。在

您可以生成两个独立的协同程序,一个用于stdout,一个用于stderr。并行运行它们将为您提供所需的语义,您可以使用gather等待它们的完成:

def watch(stream, prefix=''):
    async for line in stream:
        print(datetime.now(), prefix, line.decode().strip())

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    await asyncio.gather(watch(p.stdout), watch(p.stderr, 'E:'))

相关问题 更多 >