如何在中调用'async for'后获得异步生成器的下一次迭代`

2024-10-02 02:28:14 发布

您现在位置:Python中文网/ 问答频道 /正文

使用FastAPI,我试图检测StreamingResponse是否已被客户端完全使用,或者是否已被取消

我有以下示例应用程序:

import asyncio

import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


async def ainfinite_generator():
    while True:
        yield b"some fake data "
        await asyncio.sleep(.001)


async def astreamer(generator):
    try:
        async for data in generator:
            yield data
    except Exception as e:
        # this isn't triggered by a cancelled request
        print(e)
    finally:
        # this always throws a StopAsyncIteration exception
        # no matter whether the generator was consumed or not
        leftover = await generator.__anext__()
        if leftover:
            print("we didn't finish")
        else:
            print("we finished")


@app.get("/")
async def infinite_stream():
    return StreamingResponse(astreamer(ainfinite_generator()))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

看起来astreamer中的第一个async for in generator“消耗”异步生成器。在该循环之后,获取下一次迭代的进一步尝试失败,并出现StopAsyncIteration异常,即使生成器如上所述是“无限的”

我已经看过了PEP-525,我看到的唯一一件事是,如果将异常抛出到生成器中,它将导致从生成器读取的任何进一步尝试抛出StopAsyncIteration异常,但我看不到会在哪里发生这种情况。至少,我没有在Starlette的StreamingResponse class中看到这一点(它似乎与“内容”关系不大)。在执行async for in gen之后,生成器是否没有“释放”


Tags: inimportasyncioappfordataasyncdef
1条回答
网友
1楼 · 发布于 2024-10-02 02:28:14

下面的代码显示了如何监视协同路由(在我的例子中是异步生成器)上的取消。如注释中所述,如果异步生成器被取消,它将向生成器中注入异常,从那时起,任何获取生成器中下一项的尝试都将引发StopAsyncIteration异常。见PEP 525。要确定异步生成器是否被取消,只需在asyncio.CancelledError异常上尝试/except(该异常源自BaseException

这里还有一些代码显示如何处理普通的生成器,这些代码更宽容一些。如果您保持相同的try/except流,那么如果这些流被取消,将引发GeneratorExit异常

棘手的部分是,这些异常大多来自BaseException类,这与我预期的StopIteration异常不同,它来自Exception

顺便说一下,实际的取消发生在starlette

import asyncio
import time

import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


def infinite_generator():
    # not blocking, so doesn't need to be async
    # but if it was blocking, you could make this async and await it
    while True:
        yield b"some fake data "


def finite_generator():
    # not blocking, so doesn't need to be async
    # but if it was blocking, you could make this async and await it
    x = 0
    while x < 10000:
        yield f"{x}"
        x += 1


async def astreamer(generator):
    try:
        # if it was an async generator we'd do:
        # "async for data in generator:"
        # (there is no yield from async_generator)
        for i in generator:
            yield i
            await asyncio.sleep(.001)

    except asyncio.CancelledError as e:
        print('cancelled')


def streamer(generator):
    try:
        # note: normally we would do "yield from generator"
        # but that won't work with next(generator) in the finally statement
        for i in generator:
            yield i
            time.sleep(.001)

    except GeneratorExit:
        print("cancelled")
    finally:
        # showing that we can check here to see if all data was consumed
        # the except statement above effectively does the same thing
        try:
            next(generator)
            print("we didn't finish")
            return
        except StopIteration:
            print("we finished")


@app.get("/infinite")
async def infinite_stream():
    return StreamingResponse(streamer(infinite_generator()))


@app.get("/finite")
async def finite_stream():
    return StreamingResponse(streamer(finite_generator()))


@app.get("/ainfinite")
async def infinite_stream():
    return StreamingResponse(astreamer(infinite_generator()))


@app.get("/afinite")
async def finite_stream():
    return StreamingResponse(astreamer(finite_generator()))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

相关问题 更多 >

    热门问题