python3.6异步/等待仍然与fastAPI同步工作

2024-09-30 08:29:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个fastAPI应用程序,可以发布两个请求,其中一个更长(如果有帮助的话,它们是Elasticsearch查询,我正在使用AsyncElasticsearch模块,该模块已经返回协同路由)。这是我的尝试:

class my_module:
    search_object = AsyncElasticsearch(url, port)

    async def do_things(self):
        resp1 = await search_object.search() #the longer one
        print(check_resp1)
        resp2 = await search_object.search() #the shorter one
        print(check_resp2)
        process(resp2)
        process(resp1)
        do_synchronous_things()
        return thing

app = FastAPI()
@app.post("/")
async def service(user_input):
    result = await my_module.do_things()
    return results

我观察到的不是等待resp1,而是等到它到达check_resp1时,它已经是一个完整的响应,好像我根本没有使用异步

我是python异步的新手,我知道我的代码不会工作,但我不知道如何修复它。据我所知,当解释器看到await时,它会启动函数,然后继续,在这种情况下,它会立即发出下一个请求。我该怎么做


Tags: 模块thesearchasyncobjectmydefcheck
2条回答

是的,这是正确的。在结果准备好之前,协同程序不会继续。您可以使用asyncio.gather并发运行任务:

import asyncio


async def task(msg):
    print(f"START {msg}")
    await asyncio.sleep(1)
    print(f"END {msg}")

    return msg


async def main():
    await task("1")
    await task("2")

    results = await asyncio.gather(task("3"), task("4"))

    print(results)


if __name__ == "__main__":
    asyncio.run(main())

测试:

$ python test.py
START 1
END 1
START 2
END 2
START 3
START 4
END 3
END 4
['3', '4']

或者,您可以使用asyncio.as_completed获得最早的下一个结果:

for coro in asyncio.as_completed((task("5"), task("6"))):
    earliest_result = await coro
    print(earliest_result)

更新星期五4月2日09:25:33 UTC 2021:

asyncio.runPython 3.7+开始可用,在以前的版本中,您必须手动创建并启动循环:

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

解释

代码同步运行的原因是在do_things函数中,代码按如下方式执行:

  1. 安排search_object.search()执行
  2. 等待search_object.search()完成并获得结果
  3. 安排search_object.search()执行
  4. 等待search_object.search()完成并获得结果
  5. 执行(同步)process(resp2)
  6. 执行(同步)process(resp1)
  7. 执行(同步)do_synchronous_things()

您的目的是使步骤1和3在步骤2和步骤4之前执行。您可以使用unsync库轻松地实现它-这里是the documentation

你怎么能解决这个问题

from unsync import unsync

class my_module:
    search_object = AsyncElasticsearch(url, port)

    @unsync
    async def search1():
        return await search_object.search()

    @unsync
    async def search2():  # not sure if this is any different to search1
        return await search_object.search()

    async def do_things(self):
        task1, task2 = self.search1(), self.search2()  # schedule tasks
        resp1, resp2 = task1.result(), task2.result()  # wait till tasks are executed
        # you might also do similar trick with process function to run process(resp2) and process(resp1) concurrently
        process(resp2)
        process(resp1)
        do_synchronous_things()  # if this does not rely on resp1 and resp2 it might also be put into separate task to make the computation quicker. To do this use @unsync(cpu_bound=True) decorator
        return thing

app = FastAPI()
@app.post("/")
async def service(user_input):
    result = await my_module.do_things()
    return results

更多信息

如果您想了解有关asyncio和asyncronyous编程的更多信息,我建议您使用此tutorial。还有类似的情况,您提供了一些可能的解决方案,以使协同路由同时运行

很明显,我无法运行这段代码,所以您必须自己调试它

相关问题 更多 >

    热门问题