异步信号量和队列:如何在多个消费者之间共享队列

2024-10-02 02:30:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我是asyncio新手,在与几个消费者共享来自生产者的队列时遇到一些问题。基本上,我想做的是添加尽可能多的消费者,每个消费者都有一个特定的限制(一个信号量)。在

在下面的代码中,将consumer2(C2)限制为1个条目不起作用。你能解释一下为什么吗?在

import asyncio
import random

filenames = {"A":1,"B":2,"C":1,"D":2,"E":1,"F":1,"G":2,"H":1,"I":2,"J":2}

async def producer(f, d, q):
    print("producing {}".format(f))
    t = random.randint(4,15)
    await asyncio.sleep(t)
    res = f,d
    await q.put(res)
    print(f,d, "queued in {} secs".format(t))

async def consumer1(item):
    print("got {} : C1".format(item))
    t = 10
    await asyncio.sleep(t)
    print(item, "processed by C1 in {} secs".format(t))

async def consumer2(item):
    async with asyncio.Semaphore(1):  # doesn't work here ...
        print("got {} : C2".format(item))
        t = 10
        await asyncio.sleep(t)
        print(item, "processed by C2 in {} secs".format(t))

async def manager(f, d):
    q = asyncio.Queue()
    asyncio.ensure_future(producer(f, d, q))
    item = await q.get()
    if item is None:
        pass
    if item[1] < 2:
        r1 = await asyncio.ensure_future(consumer1(item))
    else:
        r2 = await asyncio.ensure_future(consumer2(item))


async def main(filenames):
    jobs = [
        asyncio.ensure_future(manager(f, d))
        for (f, d) in filenames.items()
    ]
    await asyncio.gather(*jobs)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main(filenames))
finally:
    loop.close()

Tags: inloopasyncioformatasyncdef消费者future
1条回答
网友
1楼 · 发布于 2024-10-02 02:30:39

每次调用consumer2()时都会创建一个新的信号量。您应该在manager()中创建一个信号量实例,并在每次调用时将其作为参数传递给consumer()。在

相关问题 更多 >

    热门问题