如何测试向接收器发送数据的Faust代理?

2024-09-30 08:18:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用pytest为Faust应用程序编写单元测试。我已经参考了文档here,但它没有提到当我的Faust代理将数据发送到接收器时应该做什么

如果没有接收器,我的测试工作正常,但当我使用接收器时,我会出现以下错误:

RuntimeError: Task <Task pending name='Task-2' coro=<Agent._execute_actor() running at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/faust/agents/agent.py:647> cb=[<TaskWakeupMethWrapper object at 0x7fc28967c5b0>()]> got Future <Future pending> attached to a different loop
INFO     faust.agents.agent:logging.py:265 [^-AgentTestWrapper: ml_exporter.processDetections]: Stopping...

我尝试过各种方法:例如修补Faust应用程序中发送数据到接收器的装饰器,尝试在没有装饰器的情况下测试我的功能(通过尝试绕过装饰器),修补Faust应用程序中的接收器参数,使其具有无值(因此它不会将我的数据发送到接收器),等等。我在这些方面都没有运气

这是我的浮士德经纪人:

app = faust.App('ml-exporter', broker=dx_broker, value_serializer='json')

detection_topic = app.topic(dx_topic)
graph_topic = app.topic(gwh_topic)

@app.agent(detection_topic, sink=[graph_topic])
async def processDetections(detections):
    detection_count = 0
    async for detection in detections:
        detection_count += 1
        # r.set("detection_count", detection_count)
        yield detection

以下是我当前的测试代码:

import ml_exporter

patch('ml_exporter.graph_topic', None)

def create_app():
    return faust.App('ml-exporter', value_serializer='json')

@pytest.fixture()
def test_app(event_loop):
    app = create_app()
    app.finalize()
    app.flow_control.resume()
    return app

@pytest.mark.asyncio()
async def test_processDetections(test_app):
    async with ml_exporter.processDetections.test_context() as agent:
        event = await agent.put('hey')
        assert agent.results[event.message.offset] == 'hey'

当我运行这个测试时,我得到了与上面提到的相同的错误。有没有办法成功测试我的Faust应用程序

谢谢大家!


Tags: testapp应用程序asynctopicdefcountml
1条回答
网友
1楼 · 发布于 2024-09-30 08:18:02

强制pytest使用Faust的asyncio事件循环作为默认全局循环。将以下夹具添加到测试代码中:

@pytest.mark.asyncio()
@pytest.fixture()
def event_loop():
    yield app.loop

pytest documentation中所述:

The event_loop fixture can be easily overridden in any of the standard pytest locations (e.g. directly in the test file, or in conftest.py) to use a non-default event loop. If the pytest.mark.asyncio marker is applied, a pytest hook will ensure the produced loop is set as the default global loop.

相关问题 更多 >

    热门问题