kafka发送失败数据获取问题

Icuiacd 发表于: 2019-11-04   最后更新时间: 2019-11-04  

使用producer.send(record, new Callback() {});异步发送数据,手动关停集群,如何获取发送失败的数据

已知可以每次发送调用get方法阻塞

RecordMetadata metadata=producer.send(record).get();

但是吞吐量就降低了



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka重复消费了主题的所有分区的所有偏移量
下一条: 使用kafka connect ,mysql作为输入,输出也是mysql 报错record value schema is missing

  • 1、关闭项目是不会导致消息丢失的(kill而不是kill -9)。
    2、更安全的方式是在项目停止之前,调用一下producer.close();,防止生产者中还有缓存的消息未发送。
    3、强制关闭(kill -9)任何方式都会丢失消息,不在此次讨论范围内,必丢消息,此种场景几乎不存在。

    • 谢谢,但实际我想问的不是如何不丢数据,而是如何获取到发送失败的数据,语言是java,写入采用带callback的send方法,异常的时候可以捕获到exception,但是无法获取到对应的数据

        • for (int i = 0; i < 999999999; i++) {
              ProducerRecord<String, String> record=new ProducerRecord<String, String>("topicname", 0, "key", "测试汉字测试汉字测试汉字测试汉字测试汉字测试汉字测试汉字"+i);
              producer.send(record, new Callback() {
                  @Override
                  public void onCompletion(RecordMetadata metadata, Exception exception) {
                      // TODO Auto-generated method stub
                      System.out.println(metadata.offset());
                      if(metadata.offset()==-1L){
                          log.info(metadata.toString());
                          log.info(exception.toString()+"\n"+exception.getMessage());
                          exception.printStackTrace();
                      }
                  }
              });
          }
          
            • -1
              [2019-11-04 17:03:25,476] INFO topicname-0@-1 (kafka.ProducerFor7667)
              [2019-11-04 17:03:25,476] INFO org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for testcaoyh-0: 30085 ms has passed since last append
              Expiring 1 record(s) for topicname-0: 30085 ms has passed since last append (kafka.ProducerFor7667)
              org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for topicname-0: 30085 ms has passed since last append
              
                • 1、new callback(m) 时,可以把你原始消息(m)也传递到类里。
                  2、callback将失败的加到失败队列里,从上层重新发送(不要在callback中重新发送)。