如何使用Asyncio控制在任何给定时间有多少任务实例在工作?

2024-09-27 00:13:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经用Asyncio创建了两个队列系统

Responder获取链接列表,向每个链接请求响应,并将结果放入队列Parser从队列中获取响应,对其进行解析,然后将其添加到另一个队列中Submitter从队列中获取已解析的对象,并将其提交到数据库

下面的代码显示了我如何创建任务。对于SubmitterParser我创建了100个实例。问题似乎是,一旦Submitter达到100个实例,提交队列就开始备份。它基本上停止了它的工作。不再提交任何内容ResponderParser通常会继续进行

一旦一个submitter完成了它的工作,我如何回收它?我不想为列表中的每个链接都创建一个responder。这是我代码末尾的部分,我不完全理解-for s in submitters: s.cancel() 这会在一切都完成后,或实例完成其工作后,杀死我的实例

async def bulk_submit(not_submitted: set, **kwargs):
    parse_queue = asyncio.Queue()
    submit_queue = asyncio.Queue()
    headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36"}

    timeout = aiohttp.ClientTimeout(total=60*60)
    async with ClientSession(headers=headers, timeout=timeout) as session:
        tasks=[]
        i=0
        for link in not_submitted:
            i+=1
            tasks.append(
                responder(f'RESPONDER-{i}',url=link, session=session, parse_queue=parse_queue, **kwargs)
            )

        parsers = [asyncio.create_task(parser(f'PARSER-{n}', parse_queue=parse_queue, submit_queue=submit_queue)) for n in range (100)]
        submitters = [asyncio.create_task(submitter(f'SUBMITTER-{n}', submit_queue=submit_queue,)) for n in range (100)]


        await asyncio.gather(*tasks)
        await parse_queue.join()
        await episode_queue.join()
        await submit_queue.join()
        for s in submitters:
            s.cancel()
        for p in parsers:
            p.cancel()


Tags: 实例inasyncioparserfor队列queueparse
1条回答
网友
1楼 · 发布于 2024-09-27 00:13:42

The issue seems to be, that once Submitter gets to 100 instances, that's it - the submit queue just starts to back up. [...]

Once a submitter has done it's job, how do I recycle it?

您没有向我们展示submitter的代码,因此很难告诉我们如何解决问题,甚至很难说您正在精确地解决什么问题。根据您显示的代码,可以猜测提交者只是在处理单个队列项目后返回。实际上,您正在创建100个并行运行的提交程序,但对队列的访问会将它们序列化。当它们中的每一个都完成其工作时,没有其他人来排空提交队列,工作停止

要解决这个问题,您不需要回收提交者,您只需要将其更改为保持队列项目的出列,而不是在获得一个项目后退出。应该是这样的:

async def submitter(name, submit_queue):
    while True:
        item = await submit_queue.get()
        ... process the item ...

使用此设置,您不需要创建100个并行运行的提交程序,一个就足够了(除非您实际上想要某种程度的并行性,也就是说,在这种情况下,您可以创建任意数量的并行性。)

It's the part at the end of my code which I don't fully understand - for s in submitters: s.cancel() This kills my instances after everything is done, or after the instance has done its job?

我怀疑在您的代码中取消是不可操作的,因为在调用cancel()时,您所有的submitter协同程序都已完成(取消已完成的任务被忽略)

通常情况下,这种想法是在工作完成后,不再需要闲置的工人,然后将他们杀掉。例如,如果submitter包含如上所示的无限循环,则取消操作将prevent it避免在bulk_submit返回后永远等待新的队列项目(并且从未接收到它们)

相关问题 更多 >

    热门问题