kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。

。莪 发表于: 2023-10-12   最后更新时间: 2023-10-12 16:01:34   381 游览

kafka 消费者组 10个消费者分布在不同的机器上,因为机器上线是串行的,发生重复消费。
串行上线机器A,在A机器上的消费者先退出再加入,这个时候导致了再均衡。
提交方式=手动提交。
消费者:会分配多个分区。

问题:

  1. 消费者组里面的消费者重复的原因。
  2. kafka再均衡过程中,正在拉去数据 | 正在处理处理 | 正在提交数据 不同状态下的消费者是怎么参与再均衡的。
发表于 2023-10-12
添加评论

kafka消费者是批量拉取消息的,比如一次拉取2000条。

  1. 如果你拉取了2000条,直接提交offset,那如果你消费者消费第1000条,报错了,那么就会丢失1000条。
  2. 如果你拉取了2000条,想先消费在提交offset,那当你先消费了1000条时,报错了,重启消费者后会重新消费这2000条,因为你没有提交offset,那么会重新消费者2000条,就出现重复消费。

说到这里,简单的消费者逻辑你已经理解了,问题的核心在于你什么时候提交这个offset。

。莪 -> 半兽人 8月前

我是拉取2000条,全部处理完成之后再进行偏移量提交。

。莪 -> 半兽人 8月前

我是通过一个线程包装成一个消费者,在关闭程序的时候是通过优雅停机,在线程里面循环判断是否中断,再退出消费者,正常情况下都是这批消息处理完,提交偏移量之后,下一次循环进行退出,理论上不应该出现重复数据。

半兽人 -> 。莪 8月前

没有那么多玄幻的情况,首先你要确保你提交的offset是正确的分区并且是成功的。

。莪 -> 半兽人 8月前

目前看我对应分区的偏移量正确提交了,成功日志正常打印了。

public void run() {
    // 设置线程Name
    threadName = Thread.currentThread().getName() + BuildUtils.buildUniqueNo();
    Thread.currentThread().setName(threadName);
    LOGGER.info("UserComponentConsumer init threadName:{}!", threadName);
    try {
        // 监控消费者
        Thread.sleep(50);
        // 订阅Topic
        consumer.subscribe(Collections.singletonList(componentGroupConfig.getTopic()), new UserComponentRebalanceHandler());
        // init 偏移量
        consumerInitOffset(new ArrayList<>(consumer.assignment()), consumer);
        // 循环接收消息
        while (consumerOpenFlag && !Thread.currentThread().isInterrupted()) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            for (TopicPartition partition : records.partitions()) {
                // 依次处理分区批量消息
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                long firstOffset = partitionRecords.get(0).offset(); // 分区批量消息 头偏移量
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 分区批量消息 尾偏移量
                try {
                    // 消息解析
                    messageDispose(partitionRecords);
                } catch (Exception exception) {
                    LOGGER.error("UserComponentConsumer messageDispose threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {} error:{}", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, exception);
                }
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset))); // 同步提交
                LOGGER.info("UserComponentConsumer threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {}, GroupId:{} !!", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, componentGroupConfig.getGroupId());
            }
        }
    } catch (WakeupException ex) {
        LOGGER.error("UserComponentConsumer threadName:{} WakeupException:{}", threadName, ex);
    } catch (InterruptedException ex) {
        LOGGER.error("UserComponentConsumer threadName:{} current thread of Interrupted, InterruptedException:{} !!", threadName, ex);
        // 恢复线程的中断状态
        Thread.currentThread().interrupt();
    } catch (Exception ex) {
        LOGGER.error("UserComponentConsumer threadName:{} Exception error:{} ", threadName, ex);
    } finally {
        // 消费者退出
        consumer.close();
    }
}
你的答案

查看kafka相关的其他问题或提一个您自己的问题