我已经用Asyncio创建了两个队列系统
Responder
获取链接列表,向每个链接请求响应,并将结果放入队列Parser
从队列中获取响应,对其进行解析,然后将其添加到另一个队列中Submitter
从队列中获取已解析的对象,并将其提交到数据库
下面的代码显示了我如何创建任务。对于Submitter
和Parser
我创建了100个实例。问题似乎是,一旦Submitter
达到100个实例,提交队列就开始备份。它基本上停止了它的工作。不再提交任何内容Responder
和Parser
通常会继续进行
一旦一个submitter
完成了它的工作,我如何回收它?我不想为列表中的每个链接都创建一个responder
。这是我代码末尾的部分,我不完全理解-for s in submitters:
s.cancel()
这会在一切都完成后,或实例完成其工作后,杀死我的实例
async def bulk_submit(not_submitted: set, **kwargs):
parse_queue = asyncio.Queue()
submit_queue = asyncio.Queue()
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36"}
timeout = aiohttp.ClientTimeout(total=60*60)
async with ClientSession(headers=headers, timeout=timeout) as session:
tasks=[]
i=0
for link in not_submitted:
i+=1
tasks.append(
responder(f'RESPONDER-{i}',url=link, session=session, parse_queue=parse_queue, **kwargs)
)
parsers = [asyncio.create_task(parser(f'PARSER-{n}', parse_queue=parse_queue, submit_queue=submit_queue)) for n in range (100)]
submitters = [asyncio.create_task(submitter(f'SUBMITTER-{n}', submit_queue=submit_queue,)) for n in range (100)]
await asyncio.gather(*tasks)
await parse_queue.join()
await episode_queue.join()
await submit_queue.join()
for s in submitters:
s.cancel()
for p in parsers:
p.cancel()
您没有向我们展示
submitter
的代码,因此很难告诉我们如何解决问题,甚至很难说您正在精确地解决什么问题。根据您显示的代码,可以猜测提交者只是在处理单个队列项目后返回。实际上,您正在创建100个并行运行的提交程序,但对队列的访问会将它们序列化。当它们中的每一个都完成其工作时,没有其他人来排空提交队列,工作停止要解决这个问题,您不需要回收提交者,您只需要将其更改为保持队列项目的出列,而不是在获得一个项目后退出。应该是这样的:
使用此设置,您不需要创建100个并行运行的提交程序,一个就足够了(除非您实际上想要某种程度的并行性,也就是说,在这种情况下,您可以创建任意数量的并行性。)
我怀疑在您的代码中取消是不可操作的,因为在调用
cancel()
时,您所有的submitter
协同程序都已完成(取消已完成的任务被忽略)通常情况下,这种想法是在工作完成后,不再需要闲置的工人,然后将他们杀掉。例如,如果
submitter
包含如上所示的无限循环,则取消操作将prevent it避免在bulk_submit
返回后永远等待新的队列项目(并且从未接收到它们)相关问题 更多 >
编程相关推荐