有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

用于普通生产者| Kafka流的java自定义分区器

我有一个kafka streams应用程序

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);

在kafka 2.4版本中,它是一个将消息分发到不同分区的类,即使使用相同的密钥

RoundRobinPartitioner具有以下实现:

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}

我的分区器由完全相同的代码组成,但分区方法实现不同,我的代码块是:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        int numPartitions = partitions.size();

        int nextValue = nextValue(topic);

        return Utils.toPositive(nextValue) % numPartitions;

    }

当我这样配置时,消息在两种实现中都被分发到不同的分区,但决不使用某些分区

My internal topic

我有50个分区,分区14和34从未收到消息。我的分区不可用。它们是可用的。当我将returnpartition方法更改为14或34时,我的所有消息都会转到该分区。有什么问题吗?这两种实现都没有按预期工作

编辑1:我已尝试使用普通制作人的RoundRobinPartitioner。结果是一样的。生产者不能在多个分区之间平等地生成消息,有些分区从未使用过。原因可能是什么?它不像是缺少的配置

编辑2:我已经调试了RoundRobinPartitioner,并在返回时放置了一个断点。当我只生成一条消息时,生产者会生成两条消息。第一次尝试总是不成功,并且该消息不会发送到任何分区。当我点击continue时,ConcurrentMap的调试索引增加了1。制作人的第二次尝试是成功的

partition()方法在我还找不到的地方被调用

编辑3:这可能与我没有覆盖的onNewBatch方法有关吗

编辑4:此实现适用于卡夫卡客户端2.2,但不适用于2.4。分区接口没有onNewBatch方法。当密钥为null 2.2 vs 2.4时,将更改DefaultPartitioner实现。它能与棒分区相关吗


共 (1) 个答案

  1. # 1 楼答案

    使用UniformStickyPartitioner。在kafka 2.4客户端版本中初始化。圆机器人切割器。类适用于卡夫卡2.2或更低版本。在2.4版本中

    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UniformStickyPartitioner.class);
    

    应该使用。我认为这与新的StickPartitioner有关