如何在BaseProxy上调用方法时有效地使用asyncio?

2024-07-03 05:44:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个应用程序,它使用LevelDB,并为不同的任务使用多个长期存在的进程。在

由于LevelDB只允许单个进程维护数据库连接,所以我们所有的数据库访问都通过一个特殊的数据库进程进行。在

要从另一个进程访问数据库,我们使用BaseProxy。但是由于我们使用asyncio,我们的代理不应该阻止这些调用db进程的api,这些api最终会从db读取数据。因此,我们使用执行器在代理上实现api。在

    loop = asyncio.get_event_loop()

    return await loop.run_in_executor(
        thread_pool_executor,
        self._callmethod,
        method_name,
        args,
    )

虽然这样做很好,但我想知道是否有更好的替代方法来将_callmethod调用包装在ThreadPoolExecutor中。在

根据我的理解,BaseProxy调用DB进程是等待IO的典型例子,因此使用线程似乎是不必要的浪费。在

在完美世界中,我假设async _acallmethod存在于BaseProxy上,但不幸的是,API不存在。在

所以,我的问题基本上可以归结为:当使用BaseProxy时,有没有一种更有效的方法来代替在ThreadPoolExecutor中运行这些跨进程调用?在


Tags: 方法loopapiasyncio数据库应用程序代理db
3条回答

假设python和数据库运行在同一个系统中(也就是说,您不希望async任何网络调用),您有两个选项。在

  1. 你已经在做什么(在执行器中运行)。它阻塞了db线程,但主线程仍然可以自由地执行其他操作。这不是纯粹的非阻塞,但是对于I/O阻塞情况,它是一个可以接受的解决方案,只需维护一个线程就可以了。

  2. 对于真正的非阻塞解决方案(可以在单个线程中运行而不阻塞),您必须有#1。对于每个fetch调用,async(回调)的本机支持,并且#2将其包装在自定义事件循环实现中。在这里,您将基循环子类化,并重写方法来集成db回调。例如,可以创建实现管道服务器的基本循环。数据库写入管道,python轮询管道。请参阅asyncio代码库中Proactor事件循环的实现。注意:我从未实现过任何自定义事件循环。

我不熟悉leveldb,但是对于键值存储来说,不清楚这样的fetch回调和纯非阻塞实现是否有任何显著的好处。如果您在一个迭代器中获得多个回迁,这是您的主要问题,您可以使循环async(每次回迁仍然阻塞),并可以提高性能。下面是一个解释这一点的伪代码。在

import asyncio
import random
import time

async def talk_to_db(d):
    """ 
        blocking db iteration. sleep is the fetch function.
    """
    for k, v in d.items():
        time.sleep(1)
        yield (f"{k}:{v}")

async def talk_to_db_async(d):
    """ 
        real non-blocking db iteration. fetch (sleep) is native async here 
    """
    for k, v in d.items():
        await asyncio.sleep(1)
        yield (f"{k}:{v}")

async def talk_to_db_async_loop(d):
    """ 
        semi-non-blocking db iteration. fetch is blocking, but the
        loop is not.
    """
    for k, v in d.items():
        time.sleep(1)
        yield (f"{k}:{v}")
        await asyncio.sleep(0)

async def db_call_wrapper(db):
    async for row in talk_to_db(db):
        print(row)

async def db_call_wrapper_async(db):
    async for row in talk_to_db_async(db):
        print(row)

async def db_call_wrapper_async_loop(db):
    async for row in talk_to_db_async_loop(db):
        print(row)

async def func(i):
    await asyncio.sleep(5)
    print(f"done with {i}")

database = {i:random.randint(1,20) for i in range(20)}

async def main():
    db_coro = db_call_wrapper(database)
    coros = [func(i) for i in range(20)]
    coros.append(db_coro)
    await asyncio.gather(*coros)

async def main_async():
    db_coro = db_call_wrapper_async(database)
    coros = [func(i) for i in range(20)]
    coros.append(db_coro)
    await asyncio.gather(*coros)

async def main_async_loop():
    db_coro = db_call_wrapper_async_loop(database)
    coros = [func(i) for i in range(20)]
    coros.append(db_coro)
    await asyncio.gather(*coros)

# run the blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# run the non-blocking db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())

# run the non-blocking (loop only) db iteration
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async_loop())

这是你可以试试的。否则,我认为你目前的方法是相当有效的。我不认为BaseProxy可以给你一个异步acall API,它不知道如何处理来自数据库的回调。在

线程池就是你想要的。aioprocessing提供了一些多处理的异步功能,但是它使用了您所建议的线程。如果没有公开真正异步多处理的方法,我建议对python提出一个问题。在

https://github.com/dano/aioprocessing

In most cases, this library makes blocking calls to multiprocessing methods asynchronous by executing the call in a ThreadPoolExecutor

不幸的是,多处理库不适合转换为asyncio,如果必须使用BaseProxy来处理IPC(进程间通信),那么您所能做的就是最好的了。在

虽然库在这里确实使用了阻塞I/O,但是您无法轻松地访问并重新处理阻塞部分以使用非阻塞原语。如果你坚持走这条路,你就必须修补或重写该库的内部实现细节,但作为内部实现细节,这些细节可能因Python的点发布而有所不同,这使得任何修补都很脆弱,而且容易因Python的小升级而中断。_callmethod方法是涉及线程、套接字或管道连接以及序列化程序的深层抽象层次的一部分。请参见^{}^{}。在

因此,您可以选择使用当前的方法(使用线程池执行器将BaseProxy._callmethod()推到另一个线程)or使用异步原语实现自己的IPC解决方案。您的中央数据库访问进程将充当其他进程作为客户机连接到的服务器,可以使用套接字或命名管道,对客户机请求和服务器响应使用约定的序列化方案。这就是multiprocessing为您实现的,但是您将使用^{} streams和任何最适合您的应用程序模式的序列化方案(例如pickle、JSON、protobuffers或其他完全不同的东西)来实现您自己的(更简单的)版本。在

相关问题 更多 >