将python asyncio与pyee事件发射器组合

2024-10-03 00:27:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图使用来自pyee libraryAsyncIOEventEmitter,但没有成功。由于某些原因,发出的事件“Hi”永远不会到达async_handler以完成异步IO的未来。我也没有在网上找到合适的例子。此外,我尝试提供当前事件并为AsyncIOEventEmitter使用新的事件循环,但两者都产生相同的结果

有人能帮我吗?下面的单元测试示例:

import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter

LOG = logging.getLogger(__name__)

@pytest.mark.asyncio
async def test_setup(event_loop):
    LOG.info("1 - start")
    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())

    # Create a new Future object.
    future_result = event_loop.create_future()
    LOG.info("2 - emit event")
    event_emitter.emit("event", "Hi")

    @event_emitter.on("event")
    async def async_handler(message):
        LOG.info(">>> %s", message)
        future_result.set_result(message)
        return future_result

    # Wait until *future_result* has a result and print it.
    LOG.info(await future_result)

谢谢


Tags: importinfoloopeventlogasynciomessageasync
1条回答
网友
1楼 · 发布于 2024-10-03 00:27:38

好的,我想好了,async_handler方法必须在测试的前面定义

这一点现在起作用了:

"""Event emitter playground"""
import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter

LOG = logging.getLogger(__name__)


@pytest.mark.asyncio
async def test_setup(event_loop):
    """Receive event from emitter and complete future!"""
    LOG.info("1 - start")
    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())

    @event_emitter.on("event")
    def async_handler(message):
        LOG.info(">>> %s", message)
        future_result.set_result(message)

    future_result = event_loop.create_future()
    LOG.info("2 - emit event")
    event_emitter.emit("event", "Hi")

    LOG.info(await future_result)

相关问题 更多 >