使用FastAPI,我试图检测StreamingResponse是否已被客户端完全使用,或者是否已被取消
我有以下示例应用程序:
import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def ainfinite_generator():
while True:
yield b"some fake data "
await asyncio.sleep(.001)
async def astreamer(generator):
try:
async for data in generator:
yield data
except Exception as e:
# this isn't triggered by a cancelled request
print(e)
finally:
# this always throws a StopAsyncIteration exception
# no matter whether the generator was consumed or not
leftover = await generator.__anext__()
if leftover:
print("we didn't finish")
else:
print("we finished")
@app.get("/")
async def infinite_stream():
return StreamingResponse(astreamer(ainfinite_generator()))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
看起来astreamer
中的第一个async for in generator
“消耗”异步生成器。在该循环之后,获取下一次迭代的进一步尝试失败,并出现StopAsyncIteration
异常,即使生成器如上所述是“无限的”
我已经看过了PEP-525,我看到的唯一一件事是,如果将异常抛出到生成器中,它将导致从生成器读取的任何进一步尝试抛出StopAsyncIteration异常,但我看不到会在哪里发生这种情况。至少,我没有在Starlette的StreamingResponse class中看到这一点(它似乎与“内容”关系不大)。在执行async for in gen
之后,生成器是否没有“释放”
下面的代码显示了如何监视协同路由(在我的例子中是异步生成器)上的取消。如注释中所述,如果异步生成器被取消,它将向生成器中注入异常,从那时起,任何获取生成器中下一项的尝试都将引发
StopAsyncIteration
异常。见PEP 525。要确定异步生成器是否被取消,只需在asyncio.CancelledError
异常上尝试/except(该异常源自BaseException
)这里还有一些代码显示如何处理普通的生成器,这些代码更宽容一些。如果您保持相同的try/except流,那么如果这些流被取消,将引发
GeneratorExit
异常棘手的部分是,这些异常大多来自
BaseException
类,这与我预期的StopIteration
异常不同,它来自Exception
类顺便说一下,实际的取消发生在starlette
相关问题 更多 >
编程相关推荐