我是asyncio新手,在与几个消费者共享来自生产者的队列时遇到一些问题。基本上,我想做的是添加尽可能多的消费者,每个消费者都有一个特定的限制(一个信号量)。在
在下面的代码中,将consumer2(C2)限制为1个条目不起作用。你能解释一下为什么吗?在
import asyncio
import random
filenames = {"A":1,"B":2,"C":1,"D":2,"E":1,"F":1,"G":2,"H":1,"I":2,"J":2}
async def producer(f, d, q):
print("producing {}".format(f))
t = random.randint(4,15)
await asyncio.sleep(t)
res = f,d
await q.put(res)
print(f,d, "queued in {} secs".format(t))
async def consumer1(item):
print("got {} : C1".format(item))
t = 10
await asyncio.sleep(t)
print(item, "processed by C1 in {} secs".format(t))
async def consumer2(item):
async with asyncio.Semaphore(1): # doesn't work here ...
print("got {} : C2".format(item))
t = 10
await asyncio.sleep(t)
print(item, "processed by C2 in {} secs".format(t))
async def manager(f, d):
q = asyncio.Queue()
asyncio.ensure_future(producer(f, d, q))
item = await q.get()
if item is None:
pass
if item[1] < 2:
r1 = await asyncio.ensure_future(consumer1(item))
else:
r2 = await asyncio.ensure_future(consumer2(item))
async def main(filenames):
jobs = [
asyncio.ensure_future(manager(f, d))
for (f, d) in filenames.items()
]
await asyncio.gather(*jobs)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(filenames))
finally:
loop.close()
每次调用consumer2()时都会创建一个新的信号量。您应该在manager()中创建一个信号量实例,并在每次调用时将其作为参数传递给consumer()。在
相关问题 更多 >
编程相关推荐