为什么redis stream用户应在同一用户组中指定单个名称?

2024-06-16 03:24:28 发布

您现在位置:Python中文网/ 问答频道 /正文

下面的代码显示了我如何测试Redis流函数

我发现具有相同使用者名称的不同进程正在竞争使用同一流中的消息。在我看来,如果这种性能是正常的,Redis不应该设计一个函数来指定使用者名称

我的理解有什么问题吗?还是我用错了方法

import asyncio
import aioredis

# consumer with name "a", subscribing two streams
async def consume_a():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_a",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()

名为“b”的消费者,订阅两条流

async def consume_b():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_b",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()

在运行脚本之前创建组

async def config_group_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream", "test_consumer_group")
        print(res)
    finally:
        await redis.close()

async def config_group_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream_1", "test_consumer_group")
        print(res)
    finally:
        await redis.close()

生产者

async def produce_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream",
                {"domain_name": "test_domain_name_0", "sid": 0},
                maxlen=5,
            )
            print(res)
            i += 1
    finally:
        await redis.close()

async def produce_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream_1",
                {"domain_name": "test_domain_name_1", "sid": 1},
                maxlen=2,
            )
            print(res)
            i += 1
    finally:
        await redis.close()

测试代码

if __name__ == "__main__":
    # two coroutines consume messages from two streams with the same consumer name
    asyncio.run(asyncio.gather(consume_a(), consume_a(), produce_0(), produce_1()))

Tags: namefromtestredislocalhosturlstreamasync
1条回答
网友
1楼 · 发布于 2024-06-16 03:24:28

基于Redis文档:

One of the guarantees of consumer groups is that a given consumer can only see the history of messages that were delivered to it, so a message has just a single owner.

有关更多信息,请阅读以下文档:

相关问题 更多 >