我有几个速率限制器类(其中一个没有显示),我想创建一个ABC。request
方法是一个异步上下文管理器。使用下面显示的代码,我得到
Signature of "request" incompatible with supertype "RateLimiterInterface"
如果我试图用@asynccontextmanager
修饰抽象方法,就会出现一个键入错误:
Argument 1 to "asynccontextmanager" has incompatible type "Callable[[RateLimiterInterface], Coroutine[Any, Any, AsyncIterator[Any]]]"; expected "Callable[..., AsyncIterator[]]"
我该怎么做?在
class RateLimiterInterface(abc.ABC):
@abc.abstractmethod
async def request(self) -> AsyncIterator:
pass
class LeakyBucketRateLimiter(RateLimiterInterface):
def __init__(self, max_tokens: Optional[int] = None, rate: float = 60) -> None:
self.max_tokens = max_tokens
self.rate = rate
self._bucket = max_tokens
self._last_added_at = time.time()
@contextlib.asynccontextmanager
async def request(self) -> AsyncIterator:
if self._bucket is None:
yield
return
while not self._bucket:
await asyncio.sleep(0)
self._add_tokens(int((time.time() - self._last_added_at) * self.rate))
self._bucket -= 1
yield
return
def _add_tokens(self, num_tokens: int) -> None:
if num_tokens == 0:
return
self._bucket += num_tokens
if self._bucket > self.max_tokens:
self._bucket = self.max_tokens
self._last_added_at = time.time()
我不能重现这个问题。下面的代码对我来说运行得很好(python3.7.1)。在
请测试它在您的环境中是否运行良好,是否是alter-question以提供最少的可复制示例。在
我在打字时遇到了同样的问题,就这样解决了:
相关问题 更多 >
编程相关推荐