Fork me on GitHub

kafka 写入数据时分区的选择规则

写入kafka的数据根据key决定数据写到哪个分区,下面是实现代码:

if (record.partition() != null) {
    // they have given us a partition, use it
    if (record.partition() < 0 || record.partition() >= numPartitions)
        throw new IllegalArgumentException("Invalid partition given with record: "
                                           + record.partition()
                                           + " is not in the range [0..."
                                           + numPartitions
                                           + "].");
    return record.partition();
} else if (record.key() == null) {
    int nextValue = counter.getAndIncrement();
    List<PartitionInfo> availablePartitions = cluster
        .availablePartitionsForTopic(record.topic());
    if (availablePartitions.size() > 0) {
        int part = Utils.abs(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        return Utils.abs(nextValue) % numPartitions;
    }
} else {
    // hash the key to choose a partition
    return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
}

算法逻辑是:指定了分区id,就使用指定的分区;key为null时,使用顺序循环算法从缓存中选择分区,如果缓存为空就从所有的分区中选择一个;如果key不为空就使用哈希算法得到值再取模所有的分区得到需要使用的分区id.


最新评论

    还没有人评论...

当当

友情链接

Powered by Python. Copyright © 2017.

鄂ICP备17010875号. All rights reserved.