<p>假设python和数据库运行在同一个系统中(也就是说,您不希望<code>async</code>任何网络调用),您有两个选项。在</p>
<ol>
<li><p>你已经在做什么(在执行器中运行)。它阻塞了db线程,但主线程仍然可以自由地执行其他操作。这不是纯粹的非阻塞,但是对于I/O阻塞情况,它是一个可以接受的解决方案,只需维护一个线程就可以了。</p></li>
<li><p>对于真正的非阻塞解决方案(可以在单个线程中运行而不阻塞),您必须有#1。对于每个fetch调用,<code>async</code>(回调)的本机支持,并且#2将其包装在自定义事件循环实现中。在这里,您将基循环子类化,并重写方法来集成db回调。例如,可以创建实现管道服务器的基本循环。数据库写入管道,python轮询管道。请参阅<code>asyncio</code>代码库中Proactor事件循环的实现。注意:我从未实现过任何自定义事件循环。</p></li>
</ol>
<p>我不熟悉leveldb,但是对于键值存储来说,不清楚这样的fetch回调和纯非阻塞实现是否有任何显著的好处。如果您在一个迭代器中获得多个回迁,这是您的主要问题,您可以使循环<code>async</code>(每次回迁仍然阻塞),并可以提高性能。下面是一个解释这一点的伪代码。在</p>
<pre><code>import asyncio
import random
import time
async def talk_to_db(d):
"""
blocking db iteration. sleep is the fetch function.
"""
for k, v in d.items():
time.sleep(1)
yield (f"{k}:{v}")
async def talk_to_db_async(d):
"""
real non-blocking db iteration. fetch (sleep) is native async here
"""
for k, v in d.items():
await asyncio.sleep(1)
yield (f"{k}:{v}")
async def talk_to_db_async_loop(d):
"""
semi-non-blocking db iteration. fetch is blocking, but the
loop is not.
"""
for k, v in d.items():
time.sleep(1)
yield (f"{k}:{v}")
await asyncio.sleep(0)
async def db_call_wrapper(db):
async for row in talk_to_db(db):
print(row)
async def db_call_wrapper_async(db):
async for row in talk_to_db_async(db):
print(row)
async def db_call_wrapper_async_loop(db):
async for row in talk_to_db_async_loop(db):
print(row)
async def func(i):
await asyncio.sleep(5)
print(f"done with {i}")
database = {i:random.randint(1,20) for i in range(20)}
async def main():
db_coro = db_call_wrapper(database)
coros = [func(i) for i in range(20)]
coros.append(db_coro)
await asyncio.gather(*coros)
async def main_async():
db_coro = db_call_wrapper_async(database)
coros = [func(i) for i in range(20)]
coros.append(db_coro)
await asyncio.gather(*coros)
async def main_async_loop():
db_coro = db_call_wrapper_async_loop(database)
coros = [func(i) for i in range(20)]
coros.append(db_coro)
await asyncio.gather(*coros)
# run the blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# run the non-blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())
# run the non-blocking (loop only) db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async_loop())
</code></pre>
<p>这是你可以试试的。否则,我认为你目前的方法是相当有效的。我不认为BaseProxy可以给你一个异步acall API,它不知道如何处理来自数据库的回调。在</p>