下面的代码显示了我如何测试Redis流函数
我发现具有相同使用者名称的不同进程正在竞争使用同一流中的消息。在我看来,如果这种性能是正常的,Redis不应该设计一个函数来指定使用者名称
我的理解有什么问题吗?还是我用错了方法
import asyncio
import aioredis
# consumer with name "a", subscribing two streams
async def consume_a():
try:
while True:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xreadgroup(
"test_consumer_group",
"consumer_a",
streams={"test_stream": ">", "test_stream_1": ">"},
count=1,
block=10000,
noack=True,
)
print(res)
finally:
await redis.close()
名为“b”的消费者,订阅两条流
async def consume_b():
try:
while True:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xreadgroup(
"test_consumer_group",
"consumer_b",
streams={"test_stream": ">", "test_stream_1": ">"},
count=1,
block=10000,
noack=True,
)
print(res)
finally:
await redis.close()
在运行脚本之前创建组
async def config_group_0():
try:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xgroup_create("test_stream", "test_consumer_group")
print(res)
finally:
await redis.close()
async def config_group_1():
try:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xgroup_create("test_stream_1", "test_consumer_group")
print(res)
finally:
await redis.close()
生产者
async def produce_0():
try:
redis = aioredis.from_url("redis://localhost:36379")
i = 0
while i < 100:
res = await redis.xadd(
"test_stream",
{"domain_name": "test_domain_name_0", "sid": 0},
maxlen=5,
)
print(res)
i += 1
finally:
await redis.close()
async def produce_1():
try:
redis = aioredis.from_url("redis://localhost:36379")
i = 0
while i < 100:
res = await redis.xadd(
"test_stream_1",
{"domain_name": "test_domain_name_1", "sid": 1},
maxlen=2,
)
print(res)
i += 1
finally:
await redis.close()
测试代码
if __name__ == "__main__":
# two coroutines consume messages from two streams with the same consumer name
asyncio.run(asyncio.gather(consume_a(), consume_a(), produce_0(), produce_1()))
基于Redis文档:
有关更多信息,请阅读以下文档:
相关问题 更多 >
编程相关推荐