当使用者在asyncio.Queue produceconsume流中引发异常时,通知生产者停止生产

2024-06-15 02:17:53 发布

您现在位置:Python中文网/ 问答频道 /正文

基于asyncio.Queue的模拟生产-消费流:

import asyncio


async def produce(q: asyncio.Queue, task):
    asyncio.create_task(q.put(task))
    print(f'Produced {task}')


async def consume(q: asyncio.Queue):
    while True:
        task = await q.get()
        if task > 2:
            print(f'Cannot consume {task}')
            raise ValueError(f'{task} too big')
        print(f'Consumed {task}')
        q.task_done()


async def main():
    queue = asyncio.Queue()
    consumers = [asyncio.create_task(consume(queue)) for _ in range(2)]
    for i in range(10):
        await asyncio.create_task(produce(queue, i))
    await asyncio.wait([queue.join(), *consumers],
                       return_when=asyncio.FIRST_COMPLETED)


asyncio.run(main())

输出为:

Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Cannot consume 4
Produced 5
Produced 6
Produced 7
Produced 8
Produced 9
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<consume() done, defined at test.py:9> exception=ValueError('3 too big')>
Traceback (most recent call last):
  File "test.py", line 14, in consume
    raise ValueError(f'{task} too big')
ValueError: 3 too big
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<consume() done, defined at test.py:9> exception=ValueError('4 too big')>
Traceback (most recent call last):
  File "test.py", line 14, in consume
    raise ValueError(f'{task} too big')
ValueError: 4 too big

消费者提出异常后,是否有办法通知生产商停止生产?
上面的代码使用多个生产者如果“通知”机制只能在单一生产者模式下工作,也可以接受。


Tags: inpytestasynciotaskqueueexceptiontoo
1条回答
网友
1楼 · 发布于 2024-06-15 02:17:53

user4815162342的建议启发

setting a global Boolean variable in case of consumer exception, and check it in the producer

import asyncio

stop = False


async def single_produce(q: asyncio.Queue):
    global stop
    for task in range(10):
        await asyncio.sleep(0.001)
        if stop:
            break
        await q.put(task)
        print(f'Produced {task}')


async def multi_produce(q: asyncio.Queue, task):
    await asyncio.sleep(0.001)
    await q.put(task)
    print(f'Produced {task}')


async def consume(q: asyncio.Queue):
    global stop
    while True:
        task = await q.get()
        if task > 2:
            stop = True
            print(f'Cannot consume {task}')
            raise ValueError(f'{task} too big')
        print(f'Consumed {task}')
        q.task_done()


async def main(mode):
    global stop
    queue = asyncio.Queue(1)
    consumers = [asyncio.create_task(consume(queue)) for _ in range(2)]
    if mode == 'single':
        print('single producer')
        await asyncio.create_task(single_produce(queue))
    elif mode == 'multiple':
        print('multiple producers')
        for i in range(10):
            if stop:
                break
            await asyncio.create_task(multi_produce(queue, i))
    await asyncio.wait([queue.join(), *consumers],
                       return_when=asyncio.FIRST_COMPLETED)


asyncio.run(main('single'))
# asyncio.run(main('multiple'))

相关问题 更多 >