基于asyncio.Queue
的模拟生产-消费流:
import asyncio
async def produce(q: asyncio.Queue, task):
asyncio.create_task(q.put(task))
print(f'Produced {task}')
async def consume(q: asyncio.Queue):
while True:
task = await q.get()
if task > 2:
print(f'Cannot consume {task}')
raise ValueError(f'{task} too big')
print(f'Consumed {task}')
q.task_done()
async def main():
queue = asyncio.Queue()
consumers = [asyncio.create_task(consume(queue)) for _ in range(2)]
for i in range(10):
await asyncio.create_task(produce(queue, i))
await asyncio.wait([queue.join(), *consumers],
return_when=asyncio.FIRST_COMPLETED)
asyncio.run(main())
输出为:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Cannot consume 4
Produced 5
Produced 6
Produced 7
Produced 8
Produced 9
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<consume() done, defined at test.py:9> exception=ValueError('3 too big')>
Traceback (most recent call last):
File "test.py", line 14, in consume
raise ValueError(f'{task} too big')
ValueError: 3 too big
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<consume() done, defined at test.py:9> exception=ValueError('4 too big')>
Traceback (most recent call last):
File "test.py", line 14, in consume
raise ValueError(f'{task} too big')
ValueError: 4 too big
消费者提出异常后,是否有办法通知生产商停止生产?
上面的代码使用多个生产者如果“通知”机制只能在单一生产者模式下工作,也可以接受。
受user4815162342的建议启发
相关问题 更多 >
编程相关推荐