<p>当启动fastapi应用程序时,uvicorn提供的事件\循环可以以异步方式调用redis get/set value。
参考-><a href="https://fastapi.tiangolo.com/tutorial/first-steps/" rel="nofollow noreferrer">https://fastapi.tiangolo.com/tutorial/first-steps/</a></p>
<p>下面的代码片段可在“上找到”https://github.com/tiangolo/fastapi/issues/1694“已更改为适合提问者的提问</p>
<p>将创建连接结果移动到对象状态中有助于处理网络调用的异步解析</p>
<p>当路径“/”被查询时,保存连接的对象状态将用于
以异步方式将结果设置为redis</p>
<pre><code>from fastapi import FastAPI
from connections import redis_cache
app = FastAPI()
@app.on_event('startup')
async def startup_event():
await redis_cache.create_connection(r_url="redis://localhost")
@app.on_event('shutdown')
async def shutdown_event():
await redis_cache._close()
@app.get("/")
async def root():
key = 'key'
value = 'data'
await redis_cache._set(key, value)
@app.get("/value")
async def get_value():
key = 'key'
return await redis_cache._get(key)
</code></pre>
<pre><code>from typing import Optional
from aioredis import Redis, create_redis
class RedisCache:
def __init__(self) -> str:
self.redis_cache = None
async def create_connection(self,r_url):
self.redis_cache = await create_redis(r_url)
async def _get(self, key) -> str:
return await self.redis_cache.get(key)
async def _set(self, key, value) -> None:
await self.redis_cache.set(key, value)
async def _close(self) -> None:
self.redis_cache.close()
await self.redis_cache.wait_closed()
redis_cache = RedisCache()
</code></pre>