python和asyncua中的异步IO队列

2024-10-02 22:29:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用python 3.8.7中的asyncio和asyncua库,我想使用asyncio.Queue将数据从异步函数传递到主线程

但是,我遇到的问题是,我的队列在第一个await之后没有收到任何消息

下面是我的例子:

import threading
import time
import asyncio
from asyncua import Client
import aiohttp

async def main(queue1):
    queue1.put_nowait("main")
    client = Client(url='opc.tcp://localhost:4840/freeopcua/server/')
    #client = aiohttp.ClientSession()

    async with client:
        await queue1.put("in_async")
        queue1.put_nowait("in_async_2")

    queue1.put_nowait("after await")


def wrapper(queue1: asyncio.Queue):
    queue1.put_nowait("wrapper")
    asyncio.run(main(queue1))

if __name__ == '__main__':
    queue1 = asyncio.Queue()
    t = threading.Thread(target=wrapper, args=(queue1,))
    t.start()
    time.sleep(1)
    noEx = True
    while noEx:
        try:
            x = queue1.get_nowait()
            print(x)
        except asyncio.QueueEmpty:
            noEx = False

我得到:

wrapper
main

如果我使用其他一些异步库(示例中为aiohttp),那么一切都会按预期工作:

wrapper
main
in_async
in_async_2
after await

我已经验证了opc.tcp://localhost:4840/freeopcua/server/服务器上的minimal asyncua server可以工作-我可以使用本示例中的代码获取数据,但是队列似乎不起作用。有什么我遗漏的吗


Tags: inimportclientasyncioasyncaiohttpserverqueue
1条回答
网友
1楼 · 发布于 2024-10-02 22:29:33

这是我的猜测,但我认为你有比赛条件。以下是主线程中的while循环:

while noEx:
    try:
        x = queue1.get_nowait()
        print(x)
    except asyncio.QueueEmpty:
        noEx = False

一旦打印出某个内容,循环将尝试获取队列中的下一个内容。如果此时队列为空,则while循环将退出,不再打印任何内容

如果在辅助线程中建立连接的速度足够快,那么队列将填充接下来的两条消息;但是如果有一点延迟,您的主while循环可能已经在消息得到打印机会之前退出。我非常确定,在进入async with:块之前,可能存在(未知)网络延迟

我会尝试在print(x)语句之后插入一个明显的时间延迟,看看会发生什么(如果这解决了问题,您可以稍后改进代码)。或者更改while循环,使其永远运行,因为您始终可以使用control-C退出程序

相关问题 更多 >