如何异步映射/过滤异步iterable?

2024-09-30 14:16:49 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我有一个异步iterable,我可以使用async for来传递它,然后如何将它映射和过滤到一个新的异步迭代器?下面的代码是我如何对同步iterable执行相同操作的改编,但它不能工作,因为yield不允许在async defs中使用

async def mapfilter(aiterable, p, func):
    async for payload in aiterable:
        if p(payload):

            # This part isn't allowed, but hopefully it should be clear
            # what I'm trying to accomplish.
            yield func(payload)

Tags: 代码inforasyncifdefthisiterable
2条回答

一个recently published PEP draft (PEP 525),它的支持是scheduled for Python 3.6,它建议允许使用与您所提出的相同语法的异步生成器。在

同时,如果您不想处理异步迭代器样板,也可以使用CryingCyclops在其注释中提到的asyncio_extras库。在

the docs

@async_generator
async def mygenerator(websites):
    for website in websites:
        page = await http_fetch(website)
        await yield_async(page)

async def fetch_pages():
    websites = ('http://foo.bar', 'http://example.org')
    async for sanitized_page in mygenerator(websites):
        print(sanitized_page)

还有一个async_generator library支持yield from构造。在

can't在协程中使用yield。要实现您的想法,我看到的唯一方法是实现Asynchronous Iterator。如果我是对的,像这样:

class MapFilter:
    def __init__(self, aiterable, p, func):
        self.aiterable = aiterable
        self.p = p
        self.func = func

    async def __aiter__(self):
        return self

    async def __anext__(self):
        while True:
            payload = await self.aiterable.__anext__()  # StopAsyncIteration would be raise here on no new values
            if self.p(payload):
                return self.func(payload)

让我们来测试一下。下面是helper arange类的完整示例(我从here获取):

^{pr2}$

输出:

6
8

相关问题 更多 >

    热门问题