Python中并发协程的无阻塞启动

2024-05-20 21:00:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我想异步并发地执行任务。如果task2到达时task1正在运行,task2将立即启动,而无需等待task2完成。另外,我希望在协同程序的帮助下避免回调。在

下面是一个带有回调的并发解决方案:

def fibonacci(n):
    if n <= 1:
        return 1
    return fibonacci(n - 1) + fibonacci(n - 2)


class FibonacciCalculatorFuture:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)

    @staticmethod
    def calculate(n):
        print(f"started n={n}")
        return fibonacci(n)

    def run(self, n):
        future = self.pool.submit(self.calculate, n)
        future.add_done_callback(lambda f: print(f.result()))


if __name__ == '__main__':
    calculator = FibonacciCalculatorFuture()
    calculator.run(35)
    calculator.run(32)
    print("initial thread can continue its work")

其输出:

^{pr2}$

下面是我努力摆脱回拨:

class FibonacciCalculatorAsync:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        self.loop = asyncio.get_event_loop()

    @staticmethod
    def calculate_sync(n):
        print(f"started n={n}")
        return fibonacci(n)

    async def calculate(self, n):
        result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)

    def run(self, n):
        asyncio.ensure_future(self.calculate(n))


if __name__ == '__main__':
    calculator = FibonacciCalculatorAsync()
    calculator.run(35)
    calculator.run(32)
    calculator.loop.run_forever()
    print("initial thread can continue its work")

输出:

started n=35
started n=32
3524578
14930352

在这种情况下,初始线程不能超过loop.run_forever(),因此不能接受新任务。在

所以,我的问题是:有没有办法同时:

  • 同时执行任务
  • 能够接受新的任务并立即安排执行(包括已经运行的TAK)
  • 使用不带回调的协同程序和代码。在

Tags: runselfloopreturnifdeffutureresult
2条回答

问题的第二个要点可以通过在专用线程中运行asyncio并使用^{}来调度协同进程来满足。例如:

class FibonacciCalculatorAsync:
    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        self.loop = asyncio.get_event_loop()

    @staticmethod
    def calculate_sync(n):
        print(f"started n={n}")
        return fibonacci(n)

    async def calculate(self, n):
        result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)

    def run(self, n):
        asyncio.run_coroutine_threadsafe(self.calculate(n), self.loop)

    def start_loop(self):
        thr = threading.Thread(target=self.loop.run_forever)
        thr.daemon = True
        thr.start()


if __name__ == '__main__':
    calculator = FibonacciCalculatorAsync()
    calculator.start_loop()
    calculator.run(35)
    calculator.run(32)
    print("initial thread can continue its work")
    calculator.run(10)
    time.sleep(1)

loop.run_forever()确实将永远运行,即使其中没有任务。好消息是你不需要这个函数。要等待计算完成,请使用asyncio.gather

class FibonacciCalculatorAsync:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        # self.loop = asyncio.get_event_loop()

    ...

    async def calculate(self, n):
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)


async def main():
    calculator = FibonacciCalculatorAsync()
    fib_35 = asyncio.ensure_future(calculator.run(35))
    fib_32 = asyncio.ensure_future(calculator.run(32))

    print("initial thread can continue its work")
    ...

    # demand fibonaccy computation has ended
    await asyncio.gather(fib_35, fib_32)


if __name__ == '__main__':
    asyncio.run(main())

请注意这个循环是如何处理的-我改变了一些东西。如果您开始使用asyncio,我实际上建议所有的东西都有一个循环,而不是为更细粒度的任务创建循环。使用这种方法,您可以获得处理和同步任务的所有asyncia bells and whistles。在

另外,由于GIL的原因,不可能在ThreadPoolExecutor中并行化纯Python非IO代码。请记住这一点,在这种情况下最好使用流程池执行器。在

相关问题 更多 >