kafka2.0版本,每次出故障后重启都会重新扫描日志文件,导致恢复很慢,是哪块没配置么?

主宰思想 发表于: 2020-05-14   最后更新时间: 2020-05-14  

提问说明

  1. kafka2.0版本,重启后会加载本地log文件,导致恢复时间长
  2. 具体日志:
    [2020-05-14 17:44:59,993] INFO [ProducerStateManager partition=ttt-2] Loading producer state from snapshot file '/mnt/vdh/kafka-logs/ttt-2/00000000000001143024.snapshot' (kafka.log.ProducerStateManager)
    [2020-05-14 17:44:59,994] INFO [ProducerStateManager partition=ttt-14] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/ttt-14/00000000000002552144.snapshot' (kafka.log.ProducerStateManager)
    [2020-05-14 17:45:00,001] INFO [ProducerStateManager partition=ttt-0] Writing producer snapshot at offset 84541 (kafka.log.ProducerStateManager)
    [2020-05-14 17:45:00,002] INFO [Log partition=ttt-0, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 84541 (kafka.log.Log)
    [2020-05-14 17:45:00,002] INFO [Log partition=ttt-0, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 84541 with message format version 2 (kafka.log.Log)
    [2020-05-14 17:45:00,006] INFO [ProducerStateManager partition=ttt-0] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/ttt-0/00000000000000084541.snapshot' (kafka.log.ProducerStateManager)
    [2020-05-14 17:45:00,006] INFO [ProducerStateManager partition=xxx-14] Writing producer snapshot at offset 79643 (kafka.log.ProducerStateManager)
    [2020-05-14 17:45:00,007] INFO [Log partition=xxx-14, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 79643 (kafka.log.Log)
    [2020-05-14 17:45:00,007] INFO [Log partition=xxx-14, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 79643 with message format version 2 (kafka.log.Log)
    [2020-05-14 17:45:00,007] INFO [ProducerStateManager partition=xxx-11] Writing producer snapshot at offset 92353 (kafka.log.ProducerStateManager)
    [2020-05-14 17:45:00,007] INFO [Log partition=xxx-11, dir=/mnt/vdl/kafka-logs] Recovering unflushed segment 92353 (kafka.log.Log)
    [2020-05-14 17:45:00,008] INFO [ProducerStateManager partition=xxx-7] Loading producer state from snapshot file '/mnt/vdc/kafka-logs/xxx-7/00000000000000529509.snapshot' (kafka.log.ProducerStateManager)
    [2020-05-14 17:45:00,008] INFO [Log partition=xxx-11, dir=/mnt/vdl/kafka-logs] Loading producer state till offset 92353 with message format version 2 (kafka.log.Log)
    [2020-05-14 17:45:00,010] INFO [ProducerStateManager partition=xxx-5] Writing producer snapshot at offset 529393 (kafka.log.ProducerStateManager)
    [2020-05-14 17:45:00,010] INFO [Log partition=xxx-5, dir=/mnt/vdd/kafka-logs] Recovering unflushed segment 529393 (kafka.log.Log)
    [2020-05-14 17:45:00,010] INFO [Log partition=xxx-5, dir=/mnt/vdd/kafka-logs] Loading producer state till offset 529393 with message format version 2 (kafka.log.Log)
    [2020-05-14 17:45:00,011] INFO [ProducerStateManager partition=xxx-6] Writing producer snapshot at offset 87516 (kafka.log.ProducerStateManager)
    [2020-05-14 17:45:00,011] INFO [Log partition=xxx-6, dir=/mnt/vdc/kafka-logs] Recovering unflushed segment 87516 (kafka.log.Log)
    [2020-05-14 17:45:00,011] INFO [Log partition=xxx-6, dir=/mnt/vdc/kafka-logs] Loading producer state till offset 87516 with message format version 2 (kafka.log.Log)
    [2020-05-14 17:45:00,012] INFO [ProducerStateManager partition=xxx-11] Loading producer state from snapshot file '/mnt/vdl/kafka-logs/xxx-11/00000000000000092353.snapshot' (kafka.log.ProducerStateManager)
    
  3. 查看源码看了一下是log里面的源码,是检查恢复

    // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
     // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
     // but we have to be careful not to assume too much in the presence of broker failures. The two most common
     // upgrade cases in which we expect to find no snapshots are the following:
     //
     // 1. The broker has been upgraded, but the topic is still on the old message format.
     // 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown.
     //
     // If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end
     // offset (see below). The next time the log is reloaded, we will load producer state using this snapshot
     // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state
     // from the first segment.
     //此处判断是否是v2版本的消息格式
     if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 ||
         (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) {
       // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the
       // last two segments and the last offset. This should avoid the full scan in the case that the log needs
       // truncation.
       offsetsToSnapshot.flatten.foreach { offset =>
         producerStateManager.updateMapEndOffset(offset)
         producerStateManager.takeSnapshot()
       }
     } else {
       val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
       //v2版本的消息格式,就会执行如下代码,truncateAndReload就重新加载log文件,导致几百G的数据记载完得很长时间
       producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())
    
       // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end
       // offset (which would be the case on first startup) and there were active producers prior to truncation
       // (which could be the case if truncating after initial loading). If there weren't, then truncating
       // shouldn't change that fact (although it could cause a producerId to expire earlier than expected),
       // and we can skip the loading. This is an optimization for users which are not yet using
       // idempotent/transactional features yet.
       if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
         logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
           val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)
           producerStateManager.updateMapEndOffset(startOffset)
    
           if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
    
  4. 是我哪块没配置么,导致的恢复慢。求大神指教啊


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka消费者是如何切换分区取数据的?
下一条: kafka 升级2.5.0版本失败,回滚到1.1.0版本后,无法查询某个消费者组的信息

  • 我现在也遇到了这个问题,同样是2.0版本,目前还在loading、writing操作,执行了5个小时了,还没有恢复完成,请问这个问题你们有好的办法吗?

    多久?kafka是基于存储的离线数据,不会在启动的时候先加载log的。
    你这个属于启动后了,kafka已经正常工作了,然后kafka根据消费者的情况依次恢复消费,

    • topic多数据量也大,3个小时才能恢复,恢复过程isr一直缺失或者leader为-1。恢复指的是开始加载新数据,恢复之后isr才补全leader也恢复。