如何对一个协程进行评级,并在该限值后调用该协程?

2024-10-01 15:34:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在阅读这里的回复:What's a good rate limiting algorithm?

Carlos A.Ibarra的回复在没有异步功能的情况下工作得很好,但是我有什么方法可以修改它来异步工作吗?在

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

time.sleep(leftToWait)更改为await asyncio.sleep(leftToWait)并等待PrintNumber(i)对第一个实例有效,但之后就不起作用了。我是Python的新手,我尽力遵守API的速率限制。在

我的实现:

^{pr2}$

Tags: returntimedefargssleepfuncprintdecorate
2条回答

这里最简单的方法之一是让代码重新轮询共享变量,然后循环,而不是假设当前实例将是一次睡眠后的下一个实例:

import time, asyncio

def rate_limited(max_per_second):
    min_interval = 1.0 / float(max_per_second)
    def decorate(func):
        last_time_called = [0.0]
        async def rate_limited_function(*args, **kargs):
            elapsed = time.time() - last_time_called[0]
            left_to_wait = min_interval - elapsed
            while left_to_wait > 0:
                await asyncio.sleep(left_to_wait)
                elapsed = time.time() - last_time_called[0]
                left_to_wait = min_interval - elapsed
            ret = func(*args, **kargs)
            last_time_called[0] = time.time()
            return ret
        return rate_limited_function
    return decorate

@rate_limited(0.2)
def print_number():
    print("Actually called at time: %r" % (time.time(),))

loop = asyncio.get_event_loop()
asyncio.ensure_future(print_number())
asyncio.ensure_future(print_number())
asyncio.ensure_future(print_number())
asyncio.ensure_future(print_number())
loop.run_forever()

…正确发射:

^{pr2}$

…显示通话间隔5秒(每秒0.2秒)。在

这里有一个简单的不和.py解决方案。这将使用on_command_error事件来保存命令并永远运行它,直到冷却问题得到解决,基本上是通过使用asyncio.sleep等待冷却时间:

bot = commands.Bot('?')

@bot.command(hidden=True, pass_context=True)
@commands.cooldown(1, 5, commands.BucketType.user)  # means "allow to be called 1 time every 5 seconds for this user, anywhere"
async def test(ctx):
    print("TEST")

@bot.event
async def on_command_error(exc, context: commands.Context):
    if isinstance(exc, commands.errors.CommandOnCooldown):
        while True:
            await asyncio.sleep(exc.retry_after)
            try:
                return await context.command.invoke(context)
            except commands.errors.CommandOnCooldown as e:
                exc = e

示例

在不一致(假设前缀是?):

^{pr2}$

在控制台中:

0s> TEST
5s> TEST
10s> TEST

相关问题 更多 >

    热门问题