如何定期使用asyncio执行函数?

2024-05-20 11:37:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在从tornado迁移到asyncio,找不到asyncio等价于tornadoPeriodicCallback。(APeriodicCallback需要两个参数:要运行的函数和调用之间的毫秒数。)

  • asyncio中有这样的等价物吗?
  • 如果没有,那么在不冒一段时间后会出现RecursionError的风险的情况下,实现这一点的最干净方法是什么?

Tags: 方法函数asyncio参数情况tornado风险等价
3条回答

当您觉得应该在异步程序的“后台”发生某些事情时,asyncio.Task可能是一个很好的方法。您可以阅读this post来了解如何处理任务。

下面是周期性执行某些函数的类的可能实现:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

我们来测试一下:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

正如您在start上看到的,我们只是开始一个任务,它调用一些函数,并在无休止的循环中休眠一段时间。在stop我们只是取消了那个任务。注意,该任务应该在程序完成时停止。

还有一件更重要的事情,回调不应该花费太多时间来执行(否则它会冻结事件循环)。如果您计划调用一些长期运行的func,您可能需要to run it in executor

对于低于3.5的Python版本:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

对于Python3.5及更高版本:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

没有对定期调用的内置支持,没有

只需创建自己的调度程序循环即可休眠并执行任何计划的任务:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)

        # execute any scheduled tasks
        await for task in scheduled_tasks(time.time()):
            await task()

迭代器应该生成在给定时间可以运行的任务。请注意,理论上,生成调度并启动所有任务可能需要超过1秒的时间;这里的想法是,调度程序生成自上次检查以来应该启动的所有任务。

相关问题 更多 >