有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java kafka启用默认robbin分区器

在我的其中一个应用程序中,我需要在我的卡夫卡制作者上使用一个Round Robbin键分区策略

写入不同分区仅在以下设置下有效(1)

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyRandomPartioner.class);

并且MyRandomPartitioner类的实现如下:

public class MyRandomPartioner implements Partitioner {
    private Logger logger = LoggerFactory.getLogger(MyRandomPartitioner.class);

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        logger.info(" Partition of Topic :" + numPartitions);
            Random randomGenerator = new Random();
            int randomInt = randomGenerator.nextInt(4) + 1;
            logger.info(" selected Partition of Topic :" + randomInt);
            return  randomInt;

    }

    @Override
    public void close() {
    }

} 

因为我想有平等的分布,我禁用了上面的道具(1),然后它总是写入单个分区

我的制作人代码:

void sendData(String operation, String message){
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(producerKafkaConfig.getTopicName(), operation,message);
            producer.send(record, new ProducerCallback()); 
        }
//Here operation is always fixed and message is my actual content. 

共 (1) 个答案

  1. # 1 楼答案

    由于您的记录是键和值,默认的分区程序将检查键,如果键不存在,则只会执行正常的分区,否则将根据键计算哈希

    如果无法删除记录的密钥,您可以使用以下分区代码

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] 
    
        valueBytes, Cluster cluster) {
                List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
                int numPartitions = partitions.size();
    
                int nextValue = nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
                    // no partitions are available, give a non-available partition
                    return Utils.toPositive(nextValue) % numPartitions;
                }
    
            }