与uvloop等效的异步事件循环

2024-05-10 13:12:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个使用asyncio的协同例程方法的事件循环。在

我热衷于使用uvloop来寻找与下面的示例类似的东西。在

下面是一个简单的asyncio事件循环示例:

import asyncio

async def read(**kwargs):
    oid = kwargs.get('oid', '0.0.0.0.0.0')
    time = kwargs.get('time', 1)
    try:
        print('start: ' + oid)
    except Exception as exc:
        print(exc)
    finally:
        await asyncio.sleep(time)
        print('terminate: ' + oid)


def event_loop(configs):
    loop = asyncio.get_event_loop()

    for conf in configs:
        asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))

    return loop

if __name__ == '__main__':
    snmp_configurations = [
        {'time': 5, 'oid': '1.3.6.3.2.4'},
        {'time': 6, 'oid': '1.3.6.3.5.8'},
    ]  # TODO :: DUMMY
    loop = event_loop(snmp_configurations)
    try:
        loop.run_forever()
    except keyboardInterrupt:
        pass
    finally:
        print("Closing Loop")
        loop.close()

问题:

  • 如何使用uvloop来修改上面的代码片段?

  • 下面的更改对于使用uvloop具有更高的性能是否正确?在

    import uvloop
    
    def event_loop(configs):
        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())  # TODO  :: uvloop.
        loop = asyncio.get_event_loop()
    
     `   for conf in configs:
            asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))
    
        return loop
    

[注意]:

  • uvloop声明使asyncio 2-4x更快。在

Tags: loopeventasyncio示例readgetuvlooptime
1条回答
网友
1楼 · 发布于 2024-05-10 13:12:06

只需在调用asyncio.get_event_loop()之前设置事件循环策略。在

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def read(**kwargs):
    oid = kwargs.get('oid', '0.0.0.0.0.0')
    time = kwargs.get('time', 1)
    try:
        print('start: ' + oid)
    except Exception as exc:
        print(exc)
    finally:
        await asyncio.sleep(time)
        print('terminate: ' + oid)


def event_loop(configs):
    loop = asyncio.get_event_loop()

    for conf in configs:
        asyncio.ensure_future(read(oid=conf['oid'], time=conf['time']))

    return loop

if __name__ == '__main__':
    snmp_configurations = [
        {'time': 5, 'oid': '1.3.6.3.2.4'},
        {'time': 6, 'oid': '1.3.6.3.5.8'},
    ]  # TODO :: DUMMY
    loop = event_loop(snmp_configurations)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        print("Closing Loop")
        loop.close()

是的,这个代码是正确的。您可以在导入后设置事件循环策略。在

^{pr2}$

相关问题 更多 >