如何将异步生成器合并为Python 3.5+的原生生成器

2024-06-13 15:19:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我很难组合异步生成器并实际运行它们。这是因为我发现运行它们的唯一方法是通过一个事件循环来返回iterable而不是生成器。让我用一个简单的例子来说明这一点:

假设我有一个google_search函数,它通过抓取来搜索google(我不是故意使用API)。它接受一个搜索字符串并返回搜索结果的生成器。此生成器不会在页面结束时结束,函数将继续转到下一页。因此,google_search函数返回一个可能几乎没完没了的生成器(从技术上讲,它总是结束的,但通常在google上搜索时,你可以获得数百万次的点击率)

def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup
    # to parse the resulting html and yield search results
    # Assume this function works
    ......

好的,现在我想创建一个函数,它允许我迭代多个google_搜索生成器。我想要这样的东西:

^{pr2}$

这样我就可以使用一个简单的for循环来展开google_搜索并得到结果。上面的代码运行得很好,但是对于任何数量相当大的搜索来说都是非常缓慢的。代码发送第一次搜索的请求,然后发送第二次搜索的请求,直到最后,它产生结果。我想加快速度。我的第一个想法是将google_搜索改为一个异步函数(我使用的是python3.6.3,可以使用await/async等)。这将创建一个异步生成器,这很好,但我只能在另一个异步函数或事件循环中运行它。并在事件循环中使用run_运行它,直到\u完成(循环聚集(…))返回结果列表,而不是普通生成器,这将破坏此目的,因为可能有太多的搜索结果无法保存在列表中。在

我怎样才能使google_搜索功能更快(最好使用异步代码,但任何东西都是受欢迎的)异步执行请求,而它仍然是一个普通的生成器? 提前谢谢!在


Tags: and方法函数字符串代码api列表search
3条回答

我将把我之前编写的解决方案粘贴在这里,因为我总是以这个问题结束,只是为了记住我以前已经解决过这个问题。在

async def iterator_merge(iterators: typing.Dict[typing.AsyncIterator, typing.Optional[asyncio.Future]]):
while iterators:
    for iterator, value in list(iterators.items()):
        if not value:
            iterators[iterator] = asyncio.ensure_future(iterator.__anext__())

    tasks, _ = await asyncio.wait(iterators.values(), return_when=asyncio.FIRST_COMPLETED)
    for task in tasks:
        # We send the result up
        try:
            res = task.result()
            yield res
        except StopAsyncIteration:
            # We remove the task from the list
            for it, old_next in list(iterators.items()):
                if task is old_next:
                    logger.debug(f'Iterator {it} finished consuming')
                    iterators.pop(it)
        else:
            # We remove the task from the key
            for it, old_next in list(iterators.items()):
                if task is old_next:
                    iterators[it] = None

它有输入注释,但我认为这是一个很好的解决方案。它的意思是以异步生成器作为键调用它,如果您有任何要等待的话,还有一个未来。在

^{pr2}$

你可以找到我怎么用的github.com/txomon/abot. 在

接受的答案等待来自每个异步生成器的一个结果,然后再调用生成器。如果数据不以同样的速度出现,那可能是个问题。下面的解决方案采用多个异步iterable(generator or not),并在多个协程中同时迭代所有这些异步iterable。每个协同程序将结果放入asyncio.Queue,然后由客户端代码迭代:

迭代器代码:

import asyncio
from async_timeout import timeout

class MergeAsyncIterator:
    def __init__(self, *it, timeout=60, maxsize=0):
        self._it = [self.iter_coro(i) for i in it]
        self.timeout = timeout
        self._futures = []
        self._queue = asyncio.Queue(maxsize=maxsize)

    def __aiter__(self):
        for it in self._it:
            f = asyncio.ensure_future(it)
            self._futures.append(f)
        return self

    async def __anext__(self):
        if all(f.done() for f in self._futures) and self._queue.empty():
            raise StopAsyncIteration
        with timeout(self.timeout):
            try:
                return await self._queue.get()
            except asyncio.CancelledError:
                raise StopAsyncIteration

    def iter_coro(self, it):
        if not hasattr(it, '__aiter__'):
            raise ValueError('Object passed must be an AsyncIterable')
        return self.aiter_to_queue(it)

    async def aiter_to_queue(self, ait):
        async for i in ait:
            await self._queue.put(i)
            await asyncio.sleep(0)

示例客户代码:

^{pr2}$

输出:

14:48:28.638975 ('a', 1)
14:48:29.638822 ('b', 2)
14:48:29.741651 ('b', 0)
14:48:29.742013 ('a', 1)
14:48:30.639588 ('c', 3)
14:48:31.742705 ('c', 1)
14:48:31.847440 ('b', 2)
14:48:31.847828 ('a', 2)
14:48:31.847960 ('c', 0)
14:48:32.950166 ('c', 1)
14:48:33.948791 ('a', 2)
14:48:34.949339 ('b', 3)
14:48:35.055487 ('c', 2)
14:48:35.055928 ('c', 'DONE')
14:48:36.049977 ('a', 2)
14:48:36.050481 ('a', 'DONE')
14:48:37.050415 ('b', 2)
14:48:37.050966 ('b', 'DONE')

注:上面的代码使用^{}第三方库。
PS2:^{}库与上面的代码相同,而且更多。在

def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup

这是普通同步发电机。您可以在其中使用requests,但如果您想使用异步aiohttp,则需要使用async def定义{a1}。在

在多个异步生成器上迭代会更有趣。不能使用普通的zip,因为它与普通iterable一起工作,而不是异步iterable。因此,您应该实现自己的(这也将支持并发迭代)。在

我做了一个小原型,我想它能满足你的需要:

^{pr2}$

输出:

first 0 1514759561
second 0 1514759561
third 0 1514759561
first 1 1514759562
second 1 1514759562
third 1 1514759562
first 2 1514759564
second 2 1514759564
third 2 1514759564
first 3 1514759567
second 3 1514759567
third 3 1514759567

时间显示不同的搜索同时运行。在

相关问题 更多 >