kafka一个topic分了4个区(Partition),生产者(Producer)发送消息时,怎么自动的平均或随机分配到各个区?

华明 发表于: 2020-03-16   最后更新时间: 2020-03-17 09:20:10   3,581 游览

我一个topic分了4个区(Partition),生产者(Producer)发送消息时,现在没指定分区,所以消息都存在一个区了。请问,能自动的平均或随机分配到不同区吗?

版本是kafka_2.12-2.4.0;场景是我topic生产的消息,想让几个消费者同时并发消费这些消息,现在我的消费者有多个,但是只有一个消费者在消费(消费者group id一样),groupid相同是不想要广播似的消费。

相关代码

未指定分区的生产消息:
public void sendNoPartition(String topic, String key, MessageEntity entity) {

   ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(
           topic,
           key,
           entity);

   long startTime = System.currentTimeMillis();

   ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
   future.addCallback(new ProducerCallback(startTime, key, entity));
}

我如下修改代码,生产时带上分区号。如此,消息会随机的到不同分区,各个消费者会并发消费。我的问题是,在生产时有什么设置吗?会自动分配到不同区。或者在消费者端,怎么设置 让这些groupid相同的消费者去并发的消费这个topic的消息。

(正常)指定了分区的生产消息:
public void sendRandomPartition(String topic, String key, MessageEntity entity) {

    List listThePartions = kafkaTemplate.partitionsFor(topic);

    for (int i = 0; i < listThePartions.size(); i++) {
        System.out.println(listThePartions.get(i));
    }
    Random rand = new Random();
    int fenpeiPartition = rand.nextInt(listThePartions.size());
    System.out.println("将分配的分区为: " + fenpeiPartition);
    ProducerRecord<String, MessageEntity> record = new ProducerRecord<>(
            topic,
            *fenpeiPartition*,
            key,
            entity);

    long startTime = System.currentTimeMillis();

    ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
    future.addCallback(new ProducerCallback(startTime, key, entity));
}
发表于 2020-03-16

未指定分区的生产消息的代码我看是没问题的,客户端默认是轮询消息到各个分区的,你的新分区是提前创建好的,还是在生产者发送中创建的?

华明 -> 半兽人 4年前

谢谢你,原来客户端默认是轮询消息到各个分区的。
我的新分区是通过kafka-topics.bat --alter --zookeeper 127.0.0.1:2181 --partitions 2 这种命令行提前更改好的。更改后,才生产消息,但是发现生产的消息 只到其中一个区了,其他区没有消息。

多找了一下资料,kafka生产者消息如何分区:默认是你说的轮询策略,还有随机策略,按key分区策略,其他分区策略,比如按ip地址。我现在改了一下,把key值改为随机:

String key = "key_"+getRandomString(5);
simpleProducer.sendNoPartition(topic, key, message);

如此,生产的消息也会分到各个区。

半兽人 -> 华明 4年前

key你写死不会变的嘛。。。。。

半兽人 -> 华明 4年前

既然你已经查了资料,key的作用可不是你加了随机数这么用的(那还不如不传,null),kafka设计的时候有几个核心。

  1. 如果你的消息发送的时候,kafka未响应,则通过key的相同,可以在发一遍,不会重复发送消息。
  2. 通过key和压缩消息,比如 小明的邮箱从xxx@xx.com改为了bbb@xx.com,又改成了ccc@xx.com,最后压缩后,只保留ccc@xx.com
华明 -> 半兽人 4年前

谢谢!

你的答案

查看kafka相关的其他问题或提一个您自己的问题