有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Kafka将消息分配给特定的消费者组

我有一个关于Kafka组ID的小问题,我可以在Java中使用以下注释:

    @KafkaListener(topics = "insert", groupId = "user")

在那里,我可以设置一个它想要使用的groupId,但我不会只使用这个组id,可能是因为我不能发送到特定的组id。我如何只发送到一个特殊的groupId?关于我可以使用GroupID的内容,或者我需要设置发送卡夫卡消息的特定主题

我已经试着在网上找到答案了,但是我什么也没找到,也许我用的是google false哈哈

我希望所有人都能理解我,如果不是,请提问:)

已经有很多了


共 (1) 个答案

  1. # 1 楼答案

    欢迎来到Kafka!首先:你不能发送给消费者群体,你只能发送给Topic


    下面的文本太多。在阅读整个答案时,要注意可能的睡意
    如果你还在读这篇文章,我想你真的想知道如何将信息发送给特定的客户,或者你真的需要尽快睡一觉。 也许两者都有。以后不要开车


    回到你的问题上来

    从该主题可以读取多个Consumer Groups。每个CG都是独立的,因此每个人都会自始至终地阅读主题把一个CG想象成一个内恐惧症消费者的联盟:他们不会关心其他群体,他们不会与其他群体交谈,他们甚至不知道是否存在其他群体

    通过使用不同的方法和/或体系结构,我可以想出三种不同的方法来实现您的目标。唯一使用Consumer Groups的是第一个,但其他两个也可能有用:

    • subscribe
    • assign
    • 多个主题

    前两个基于在单个主题中划分消息的机制。第三种方法只能在某些情况下成立。让我们进入这些选项


    1Subscribe and Consumer Groups

    您可以创建一个新主题,用消息填充它,并添加一些元数据,以便识别需要处理每条消息的人(该消息指向的人

    存储在卡夫卡中的消息除其他字段外,还包含一个KEY和一个VALUE(消息本身)

    假设您只希望GROUP-A处理一些特定的消息。一个简单的解决方案是在键上包含一个标识符,例如后缀。其中一个键可能看起来像:key#GA

    在消费者方面,您可以poll()从该主题获取消息,并在处理之前添加一点额外的条件逻辑:您只需读取键并检查后缀。如果它与指定的使用者组相对应,在本例中,它包含GA,则来自GROUP-A的使用者知道它必须处理该消息

    例如,您的主题存储两种不同性质的消息,您希望将它们定向到两个组:GROUP-AGROUP-Z

         key    value
     - [11#GA][MESSAGE]
     - [21#GZ][MESSAGE]
     - [33#GZ][MESSAGE]
     - [44#GA][MESSAGE]
    

    两个消费群体都会对这4条消息进行投票,但每个群体只会处理其中的一些消息

    • Group-A将丢弃第二条和第三条消息它将处理第1个和第4个

    • Group-Z将丢弃第一条和第四条消息它将处理第二个和第三个

    这基本上是你的目标,但使用一些额外的逻辑和玩卡夫卡的架构。带有特定后缀的消息将被“定向到”特定的消费者群体,并被其他消费者群体忽略


    2Assign

    上述解决方案侧重于消费者群体和卡夫卡的subscribe方法。另一个可能的解决方案是使用卡夫卡的assign方法,而不是订阅消费者群体。此处不涉及ConsumerGroup,因此将引用之前的以避免任何混淆

    Assign允许您直接指定消费者必须阅读的主题/分区

    在生产者端,您应该使用自己的逻辑对消息进行分区,以便在主题内的分区之间进行分区。关于自定义分区器的一些更深入的信息here是的,链接中的作者看起来像一个彻底的傻瓜

    例如,假设您有5种不同的类型g> 的consumers。因此,您创建了一个包含5个分区的Topic,每个“组”对应一个分区。您的producer的自定义分区器为每条消息标识相应的分区,在生成上一示例中的消息后,主题将呈现此结构:

    enter image description here

    为了将消息定向到相应的“组”

    • "Group-Z"分配到第五个分区
    • "Group-A"分配到第一个分区

    此解决方案的优点是浪费的资源更少:每个“组”只轮询自己的消息,当验证每个消息都指向轮询它的消费者时,您可以避免丢弃/接受逻辑:线路上的通信量更少,内存中的对象更少,cpu工作更少

    disadvatange包含一个更复杂的卡夫卡制作人机制,该机制涉及一个自定义分区器,该分区器肯定应该不断更新有关数据或主题结构变化的信息。此外,这还将导致在每次更改生产方时更新您的消费者定义的分配


    个人备忘:

    Assign提供了更好的性能,但代价很高:手动和持续控制生产者、主题、分区和消费者,因此(可能)更容易出错。我称之为有效的解决方案

    Subscribe使所有过程更加简单,并且可能会减少系统上的问题/错误,因此更加可靠。我称之为有效的解决方案

    无论如何,这完全是一种主观的质疑


    尚未完成

    3. Multi-topic solution.

    先前提出的解决方案假设消息具有相同的性质,因此将在同一主题中生成

    为了解释我在这里要说的内容,让我们假设一个主题被表示为一个存储建筑

    Your electronic devices topic< laptops, tables, smartphones,...

    以前的解决方案假设您在那里存储类似的元素,例如,电子设备;它们的使用寿命是相似的,存储方法是相似的,无论具体的设备类型如何,使用的机器是相同的,等等。考虑到这一点,将所有这些元素存储到同一个仓库中,分为不同的部分(分为相同的主题,分为不同的分区)是完全合乎逻辑的

    没有真正的理由为每个电子设备系列(一个用于电视,另一个用于移动电话,)建立一个新的仓库,除非你用金钱包装。以前的解决方案假定您的消息是不同类型的“电子设备”

    但随着时间的推移,你做得很好,所以决定开始一项新的业务:水果储存

    水果有更少的生命(log.retention.ms任何人?),必须存储在一定温度范围内,并且您的设备存储元件和技术可能与第一个仓库有很大不同。此外,您的水果业务可能会在一年中的某些时段关闭,而电子设备将全天候接收。即使你每天打开你设备的仓库,也可能水果仓库只在周一和周二工作(幸运的是,不会因为这段时间而暂时关闭)

    由于水果和电子设备需要不同类型的存储管理,您决定建立一个新的仓库。你的新话题是什么

    bananas here< bananas, kiwis, apples, chicozapotes,...

    创建第二个主题在这里是合理的,因为每个主题可能需要不同的配置值,并且每个主题都存储来自不同性质的内容。这导致消费者的处理逻辑也非常不同

    那么,这是第三种可能的解决方案吗

    嗯,它确实让你忘记了about用户组、分区机制、手动分配等。您只需决定哪些用户订阅了哪个主题,就完成了:您有效地将消息定向到了特定的用户

    但是,如果你建了一个仓库并开始储存电脑,你真的会建另一个仓库来储存刚到货的手机吗在现实生活中,你需要支付第二栋楼的建设费用,还要支付两项税费,还要支付两栋楼的清洁费用,等等

    laptops here->{a8}{}

    在卡夫卡的世界中,这将被表示为卡夫卡集群的额外工作(两次复制请求,zookeeper有一个新ACL和控制器的新生儿,…),分配给这项工作的人的额外时间,因为现在负责管理两个主题:一个工人花时间在可以避免的事情上是公司损失欧元的同义词。此外,我不知道他们是否已经这样做了,也不知道他们是否会这样做,但云提供商不知何故喜欢为某些操作插入小额税收,例如创建一个主题(,但这只是一种可能性,我在这里可能是错的

    继续说,这不一定是个坏主意:它只需要一个合理的上下文。如果您正在使用香蕉和高通芯片,请使用它

    如果您正在使用笔记本电脑和平板电脑,请选择前面显示的消费群体和分区解决方案