Python3:如何向线程池提交异步函数?

2024-10-02 14:16:46 发布

您现在位置:Python中文网/ 问答频道 /正文

{cd1}和{cd2}函数都要使用。在

我的程序反复向线程池提交具有不同输入值的函数。在那个更大的函数中执行的最后一个任务序列可以是任意顺序的,我不关心返回值,只关心它们在将来的某个时刻执行。在

所以我试着这么做

async def startLoop():

    while 1:
        for item in clients:
            arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))

        wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)

其中提交的函数是:

^{pr2}$

其中do_b和{}是异步的功能。The问题是我得到了一个错误:TypeError: object Future can't be used in 'await' expression,如果我删除wait,我会得到另一个错误,说我需要添加await。在

我想我可以让所有的东西都使用线程,但我真的不想这么做。在


Tags: 函数in程序顺序错误序列await线程
1条回答
网友
1楼 · 发布于 2024-10-02 14:16:46

我建议仔细阅读Python3的asyncio development guide,特别是“并发和多线程”部分。在

在您的示例中,主要的概念问题是事件循环是单线程的,因此在线程池中执行异步协同程序没有意义。事件循环和线程有几种交互方式:

  • 每个线程的事件循环。例如:

    async def threadWorkAsync(obj):
        b = do_something()
        if b:
            # Run a and b as concurrent tasks
            task_a = asyncio.create_task(do_a())
            task_b = asyncio.create_task(do_b())
            await task_a
            await task_b
    
    def threadWork(obj):
        # Create run loop for this thread and block until completion
        asyncio.run(threadWorkAsync())
    
    def startLoop():
        while 1:
            arrayOfFutures = []
            for item in clients:
                arrayOfFutures.append(config.threadPool.submit(threadWork, item))
    
            wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
    
  • 在执行器中执行阻塞代码。这允许您使用异步期货,而不是上面提到的并发期货。

    async def startLoop():
        while 1:
            arrayOfFutures = []
            for item in clients:
                arrayOfFutures.append(asyncio.run_in_executor(
                    config.threadPool, threadWork, item))
    
            await asyncio.gather(*arrayOfFutures)
    
  • 使用线程安全函数跨线程向事件循环提交任务。例如,您可以在主线程的运行循环中运行所有异步协同程序,而不是为每个线程创建一个运行循环:

    def threadWork(obj, loop):
        b = do_something()
        if b:
            future_a = asyncio.run_coroutine_threadsafe(do_a())
            future_b = asyncio.run_coroutine_threadsafe(do_b())
            concurrent.futures.wait([future_a, future_b])
    
    async def startLoop():
        loop = asyncio.get_running_loop()
        while 1:
            arrayOfFutures = []
            for item in clients:
                arrayOfFutures.append(asyncio.run_in_executor(
                    config.threadPool, threadWork, item, loop))
    
            await asyncio.gather(*arrayOfFutures)
    

    注意:这是相当混乱的,所以我不推荐它,但是为了完整起见,我把它包括进来。

相关问题 更多 >

    热门问题