kafka重复消费了主题的所有分区的所有偏移量

—— Only、 发表于: 2019-11-05   最后更新时间: 2019-11-05 20:55:24   2,368 游览

消费端部署了大概七八台服务,设置了10个分区,每个消费端只启动了一个线程在循环消费,所有消费者服务器代码是一样的,也就是说消费者组是相同的。

观察了日志消费端从新消费了以前消费过的偏移量。如果是生产者重复发送了数据,消费端应该不会消费到到相同分区的相同偏移量。

public class Consumer implements  Runnable{
    private static Logger logger = LoggerFactory.getLogger(Consumer.class);
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private static final String GROUPID = "groupA";
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private boolean flag;

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    public Consumer(String servers, String topicName, boolean flag) {
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");//获取记录数
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");//处理消息最大间隔时间
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.flag=flag;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        logger.info("---------kafka消费端开始消费---------");
        Gson gson = new Gson();
        try {
            while(flag){
                ConsumerRecords<String, String> msgList = consumer.poll(1000);
                if(null!=msgList&&msgList.count()>0){
                    for (TopicPartition partition : msgList.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = msgList.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            long start = System.currentTimeMillis();
                            try {
                                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
                            } catch (Exception e) {
                                logger.error("kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},commit time:{},error:{}", topic, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e);
                                break;
                            } catch (Throwable e) {
                                logger.error("fatal Error:kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},message:{},error:{}", topic, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e);
                                break;
                            }
                            String reqJson = record.value();
                            ProposalSaveFor3105RequestDto baseRequestDto = null;
                            String proposalNo = "";
                            try {
                                baseRequestDto = gson.fromJson(reqJson,ProposalSaveFor3105RequestDto.class);
                                proposalNo = baseRequestDto.getPolicyDataDto().getInsuredItemProcutDtoList().get(0).getInsuredDto().getProposalNo();
                                MDC.put("uuid", proposalNo);
                                logger.info("kafka拉取了投保单号:"+proposalNo+",分区编码:"+record.partition()+",偏移量:"+record.offset());
                            }catch (Exception e){
                                e.printStackTrace();
                                logger.error("kafka消费端异常---转换报文报错---报文内容:"+reqJson+",异常信息:",e);
                            }
                            if(baseRequestDto!=null){
                                try {
                                    logger.info("kafka开始调用业务,投保单号"+proposalNo);
                                    KafkaProposalSaveFor3105ServiceImpl service = new KafkaProposalSaveFor3105ServiceImpl();
                                    service.saveProposal(baseRequestDto);
                                    logger.info("kafka结束调用业务,投保单号"+proposalNo);
                                }catch (Exception e){
                                    e.printStackTrace();
                                    logger.error("kafka消费端业务逻辑执行异常---异常信息:",e);
                                }
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error("kafka消费端消费信息时异常---异常信息:",e);
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

日志:

 ts:2019-11-01 21:12:11.496 msg:kafka拉取了投保单号:9127264100190000072000,分区编码:6,偏移量:6 
 ts:2019-11-04 14:25:17.075 msg:kafka拉取了投保单号:9127264100190000072000,分区编码:6,偏移量:6 
 ts:2019-11-05 14:40:11.835 msg:kafka拉取了投保单号:9127264100190000072000,分区编码:6,偏移量:6 
观察日志,过了几天竟然重复去拉去了以前的消息,实在搞不懂是什么情况。我只写了消费端,所以只贴了消费端代码。
发表于 2019-11-05
添加评论

这个代码看着很熟悉,没问题,可以先将offset策略改成

auto.offset.reset=latest

消费者组只要是同一个肯定没问题,确认下是否有新的消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

谢谢,这个是查询所有的消费者组吗?能查某个主题下的组吗?这样才能知道有没有其他组在消费我的主题把

先不用关心其他消费者组,组之间都是全量拉取消息的。

对的,“组之间都是全量拉取消息的”,我没明白你说的“确认下是否有新的消费者组”,我查出了所有消费者组,怎么确定是否有新的消费者组呢?如果有新的消费者组能说明什么呢?也有可能有的组是其他系统在用,但不是订阅的我的主题啊

1、如果有新的消费者组,是为了排查是否你的程序导致的每次组名不同导致每次都拉取新的消息
2、排查自己的消费者组中的成员是否都是你的,如对应的ip,是否都是属于你自己的消费者程序。(有些开发者代码都是互相复制到,就遇到过配置一样的消费者组)
3、本来我是怀疑提交offset的时候,提交的是老的offset上了,导致的重复消费,但是代码我之前写的就没问题了。

所以,现在的排查方式是观察消费者的整体情况,除了通过上面的命令,也可以直接安装一个kafka monitor监控查看。

我在测试环境和生产环境用的消费者组是一样的,而且kafka服务我们生产和测试是同一个服务器,但生产环境和测试环境用的topic不一样,这个有影响吗

这个不会的,

你的答案

查看kafka相关的其他问题或提一个您自己的问题