连接到redis时出现异步代码故障

2024-06-16 06:01:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我创建了一个小类,使用aioredis对redis执行基本操作

class RedisService:
    def __init__(self, r_url) -> str:
        self.redis = r_url

    async def create_connection(self):
        return await aioredis.create_redis(self.redis)

    async def _get(self, key) -> str:
        try:
            return await self.create_connection().get(key, encoding='utf-8')
        finally:
            await self._close()

    async def _set(self, key, value) -> None:
        await self.create_connection().set(key, value)
        await self._close()

    async def _close(self) -> None:
        self.create_connection().close()
        await self._redis.wait_closed() 

以及一个用于调用redis的写/读操作的测试处理程序

@router.post('/perform')
async def index():
    key = 'test'
    value = 'test'
    value = await RedisService(r_url)._set(key, value)
    return {'result': value}

但是得到错误

    await self.create_connection.set(key, value)
AttributeError: ''coroutine'' object has no attribute 'set'

我猜问题可能是异步代码必须通过事件循环运行

asyncio.run(some coroutine)

但我不明白如何将这种逻辑构建到我的代码中


Tags: keyselfredisurlcloseasyncreturnvalue
2条回答

您的问题是如何使用create_connection。你必须给它打电话,等待它回来

await self.create_connection()

然后您还需要等待setget。作为单行线,这会变得混乱

await (await self.create_connection()).set(key, value)

为了帮助清理这个问题,您应该将等待拆分为单独的语句

conn = await self.create_connection()
await conn.set(key, value)

每次需要执行操作时创建新连接的成本可能会很高。我建议用一种或两种方式改变create_connection

或者让它将连接连接到您的实例

async def create_connection(self):
    self.conn = await aioredis.create_redis(self.redis)

您可以在实例化RedisService的实例并使用

await self.conn.set(key, value)

或者您可以切换到使用连接池

async def create_connection(self):
    return await aioredis.create_redis_pool(self.redis)

当启动fastapi应用程序时,uvicorn提供的事件\循环可以以异步方式调用redis get/set value。 参考->https://fastapi.tiangolo.com/tutorial/first-steps/

下面的代码片段可在“上找到”https://github.com/tiangolo/fastapi/issues/1694“已更改为适合提问者的提问

将创建连接结果移动到对象状态中有助于处理网络调用的异步解析

当路径“/”被查询时,保存连接的对象状态将用于 以异步方式将结果设置为redis

from fastapi import FastAPI
from connections import redis_cache

app = FastAPI()

@app.on_event('startup')
async def startup_event():
    await redis_cache.create_connection(r_url="redis://localhost")

@app.on_event('shutdown')
async def shutdown_event():
    await redis_cache._close()

@app.get("/")
async def root():
    key = 'key'
    value = 'data'
    await redis_cache._set(key, value)

@app.get("/value")
async def get_value():
    key = 'key'
    return await redis_cache._get(key)

from typing import Optional
from aioredis import Redis, create_redis


class RedisCache:
    def __init__(self) -> str:
        self.redis_cache = None

    async def create_connection(self,r_url):
        self.redis_cache = await create_redis(r_url)

    async def _get(self, key) -> str:
        return await self.redis_cache.get(key)

    async def _set(self, key, value) -> None:
        await self.redis_cache.set(key, value)

    async def _close(self) -> None:
        self.redis_cache.close()
        await self.redis_cache.wait_closed()

redis_cache = RedisCache()

相关问题 更多 >