KafkaOffsetMonitor:监控消费者和延迟的队列

    原创
半兽人 发表于: 2015-03-10   最后更新时间: 2016-10-14  

一个小应用程序来监视kafka消费者的进度和它们的延迟的队列。

KafkaOffsetMonitor是用来实时监控Kafka集群中的consumer以及在队列中的位置(偏移量)。

你可以查看当前的消费者组,每个topic队列的所有partition的消费情况。可以很快地知道每个partition中的消息是否很快被消费以及相应的队列消息增长速度等信息。这些可以debug kafka的producer和consumer,你完全知道你的系统将会发生什么。

这个web管理平台保留的partition offset和consumer滞后的历史数据(具体数据保存多少天我们可以在启动的时候配置),所以你可以很轻易了解这几天consumer消费情况。

KafkaOffsetMonitor这款软件是用Scala代码编写的,消息等历史数据是保存在名为offsetapp.db数据库文件中,该数据库是SQLLite文件,非常的轻量级。虽然我们可以在启动KafkaOffsetMonitor程序的时候指定数据更新的频率和数据保存的时间,但是不建议更新很频繁,或者保存大量的数据,因为在KafkaOffsetMonitor图形展示的时候会出现图像展示过慢,或者是直接导致内存溢出了。

所有的关于消息的偏移量、kafka集群的数量等信息都是从Zookeeper中获取到的,日志大小是通过计算得到的。

消费者组列表

screenshot

消费组的topic列表

screenshot

图中参数含义解释如下:

topic:创建时topic名称
partition:分区编号
offset:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者
Created:该partition创建时间
Last Seen:消费状态刷新最新时间。

topic的历史位置

screenshot

Offset存储位置

kafka能灵活地管理offset,可以选择任意存储和格式来保存offset。KafkaOffsetMonitor目前支持以下流行的存储格式。

  • kafka0.8版本以前,offset默认存储在zookeeper中(基于Zookeeper)
  • kafka0.9版本以后,offset默认存储在内部的topic中(基于Kafka内部的topic)
  • Storm Kafka Spout(默认情况下基于Zookeeper)

KafkaOffsetMonitor每个运行的实例只能支持单一类型的存储格式。

下载

可以到github下载KafkaOffsetMonitor源码。

https://github.com/quantifind/KafkaOffsetMonitor

编译KafkaOffsetMonitor命令:

sbt/sbt assembly

不过不建议你自己去下载,因为编译的jar包里引入的都是外部的css和js,所以打开必须联网,都是国外的地址,你编译的时候还要修改js路径,我已经搞定了,你直接下载就好了。

百度云盘:https://pan.baidu.com/s/1kUZJrCV

启动

编译完之后,将会在KafkaOffsetMonitor根目录下生成一个类似KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar的jar文件。这个文件包含了所有的依赖,我们可以直接启动它:

java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --offsetStorage kafka \
     --zk zk-server1,zk-server2 \
     --port 8080 \
     --refresh 10.seconds \
     --retain 2.days

启动方式2,创建脚本,因为您可能不是一个kafka集群。用脚本可以启动多个。

vim mobile_start_en.sh
        nohup java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb 
       --offsetStorage kafka
       --zk 127.0.0.1:2181  
       --port 8080      
       --refresh 10.seconds      
       --retain 2.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log &

各个参数的含义:

  • offsetStorage:有效的选项是"zookeeper","kafka","storm"。0.9版本以后,offset存储的位置在kafka。
  • zk: zookeeper的地址
  • prot 端口号
  • refresh 刷新频率,更新到DB。
  • retain 保留DB的时间
  • dbName 在哪里存储记录(默认'offsetapp')


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka审核
下一条: Kafka Manager

  • 你好,github上下载的源码按照你说的改完了那一个类,怎么用sbt/sbt assembly命令重新打包成jar文件啊,你那里有修改过的jar吗

    大佬!kafkaMonitor的数据只能看到topic,其他的都没有,cmd命令能查看到,这是怎么回事

    • WARN  KafkaOffsetGetter$:89 - Failed to process one of the commit message due to exception. The 'bad' message will be skipped
      java.lang.RuntimeException: Unknown offset schema version 2
              at com.quantifind.kafka.core.KafkaOffsetGetter$.schemaFor(KafkaOffsetGetter.scala:162)
              at com.quantifind.kafka.core.KafkaOffsetGetter$.com$quantifind$kafka$core$KafkaOffsetGetter$$readMessageKey(KafkaOffsetGetter.scala:187)
              at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply$mcV$sp(KafkaOffsetGetter.scala:100)
              at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply(KafkaOffsetGetter.scala:87)
              at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply(KafkaOffsetGetter.scala:87)
              at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
              at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
              at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
              at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
              at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
              at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
              at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      用的是2.2.0的版本,消费者启动后出现这个错误,用zk储存位置启动时,出现消费者组,但不是我创建的,kafka储存位置启动时还是什么都没有。zk和kafka一直显示的只有topic

        • package com.quantifind.kafka.core
          import java.nio.ByteBuffer
          import com.quantifind.kafka.OffsetGetter.OffsetInfo
          import com.quantifind.kafka.OffsetGetter
          import com.quantifind.utils.ZkUtilsWrapper
          import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
          import kafka.common.{OffsetAndMetadata, TopicAndPartition}
          import kafka.consumer.{ConsumerConnector, KafkaStream}
          import kafka.message.MessageAndMetadata
          import kafka.utils.Logging
          import org.I0Itec.zkclient.ZkClient
          import scala.collection._
          import scala.concurrent.ExecutionContext.Implicits.global
          import scala.concurrent.Future
          import scala.util.control.NonFatal
          class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {
            import KafkaOffsetGetter._
            override val zkClient = theZkClient
            override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
              try {
                zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
                  case Some(bid) =>
                    val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
                    consumerOpt flatMap { consumer =>
                        val topicAndPartition = TopicAndPartition(topic, pid)
                        offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
                          val request =
                            OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
                          val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
                          OffsetInfo(group = group,
                            topic = topic,
                            partition = pid,
                            offset = offsetMetaData.offset,
                            logSize = logSize,
                            owner = Some("NA"))
                        }
                    }
                  case None =>
                    error("No broker for partition %s - %s".format(topic, pid))
                    None
                }
              } catch {
                case NonFatal(t) =>
                  error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
                  None
              }
            }
            override def getGroups: Seq[String] = {
              topicAndGroups.groupBy(_.group).keySet.toSeq
            }
            override def getTopicList(group: String): List[String] = {
              topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
            }
            override def getTopicMap: Map[String, scala.Seq[String]] = {
              topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
            }
            override def getActiveTopicMap: Map[String, Seq[String]] = {
              getTopicMap
            }
          }
          object KafkaOffsetGetter extends Logging {
            val ConsumerOffsetTopic = "__consumer_offsets"
            val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
            val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()
            def startOffsetListener(consumerConnector: ConsumerConnector) = {
              Future {
                try {
                  logger.info("Staring Kafka offset topic listener")
                  val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
                    .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
                    .get(ConsumerOffsetTopic).map(_.head) match {
                    case Some(s) => s
                    case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
                  }
                  val it = offsetMsgStream.iterator()
                  while (true) {
                    try {
                      val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
                      val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
                      val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
                      info("Processed commit message: " + commitKey + " => " + commitValue)
                      offsetMap += (commitKey -> commitValue)
                      topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
                      info(s"topicAndGroups = $topicAndGroups")
                    } catch {
                      case e: RuntimeException =>
                        // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
                        warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
                    }
                  }
                } catch {
                  case e: Throwable =>
                    fatal("Offset topic listener aborted dur to unexpected exception", e)
                    System.exit(1)
                }
              }
            }
            // massive code stealing from kafka.server.OffsetManager
            import java.nio.ByteBuffer
            import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
            import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}
            private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
            private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
                                                                 new Field("topic", STRING),
                                                                 new Field("partition", INT32))
            private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
            private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
            private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
            private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
                                                                   new Field("metadata", STRING, "Associated metadata.", ""),
                                                                   new Field("timestamp", INT64))
            private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
                                                                   new Field("metadata", STRING, "Associated metadata.", ""),
                                                                   new Field("commit_timestamp", INT64),
                                                                   new Field("expire_timestamp", INT64))
            private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
                                                                   new Field("metadata", STRING, "Associated metadata.", ""),
                                                                   new Field("commit_timestamp", INT64))
            private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
                                                                   new Field("leader_epoch", INT32),
                                                                   new Field("metadata", STRING, "Associated metadata.", ""),
                                                                   new Field("commit_timestamp", INT64))
            private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
            private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
            private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
            private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
            private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
            private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
            private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
            private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
            private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
            private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
            private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
            private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
            private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
            // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
            // map of versions to schemas
            private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
                                             1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
              2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
              3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))
            private def schemaFor(version: Int) = {
              val schemaOpt = OFFSET_SCHEMAS.get(version)
              schemaOpt match {
                case Some(schema) => schema
                case _ => throw new RuntimeException("Unknown offset schema version " + version)
              }
            }
            case class MessageValueStructAndVersion(value: Struct, version: Short)
            case class TopicAndGroup(topic: String, group: String)
            case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
              def this(group: String, topic: String, partition: Int) =
                this(group, new TopicAndPartition(topic, partition))
              override def toString =
                "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
            }
            /**
             * Decodes the offset messages' key
             *
             * @param buffer input byte-buffer
             * @return an GroupTopicPartition object
             */
            private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
              val version = buffer.getShort()
              val keySchema = schemaFor(version).keySchema
              val key = keySchema.read(buffer).asInstanceOf[Struct]
              val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
              val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
              val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
              GroupTopicPartition(group, TopicAndPartition(topic, partition))
            }
            /**
             * Decodes the offset messages' payload and retrieves offset and metadata from it
             *
             * @param buffer input byte-buffer
             * @return an offset-metadata object from the message
             */
            private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
              val structAndVersion = readMessageValueStruct(buffer)
              if (structAndVersion.value == null) { // tombstone
                null
              } else {
                if (structAndVersion.version == 0) {
                  val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
                  val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
                  val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
                  OffsetAndMetadata(offset, metadata, timestamp)
                } else if (structAndVersion.version == 1) {
                  val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
                  val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
                  val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                  // not supported in 0.8.2
                  // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                  OffsetAndMetadata(offset, metadata, commitTimestamp)
                } else if (structAndVersion.version == 2) {
                  val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
                  val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
                  val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
                  // not supported in 0.8.2
                  // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                  OffsetAndMetadata(offset, metadata, commitTimestamp)
                }else if(structAndVersion.version == 3){
                  val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
                  val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
                  val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
                  // not supported in 0.8.2
                  // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                  OffsetAndMetadata(offset, metadata, commitTimestamp)
                } else {
                  throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
                }
              }
            }
            private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
              if(buffer == null) { // tombstone
                MessageValueStructAndVersion(null, -1)
              } else {
                val version = buffer.getShort()
                val valueSchema = schemaFor(version).valueSchema
                val value = valueSchema.read(buffer).asInstanceOf[Struct]
                MessageValueStructAndVersion(value, version)
              }
            }
          }
          

          改下源码,这个类的代码改成这样就好了

            2019-08-16 10:02:27 WARN  KafkaOffsetGetter$:89 - Failed to process one of the commit message due to exception. The 'bad' message will be skipped
            java.lang.RuntimeException: Unknown offset schema version 3
                    at com.quantifind.kafka.core.KafkaOffsetGetter$.schemaFor(KafkaOffsetGetter.scala:162)
                    at com.quantifind.kafka.core.KafkaOffsetGetter$.readMessageValueStruct(KafkaOffsetGetter.scala:234)
                    at com.quantifind.kafka.core.KafkaOffsetGetter$.com$quantifind$kafka$core$KafkaOffsetGetter$$readMessageValue(KafkaOffsetGetter.scala:204)
                    at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply$mcV$sp(KafkaOffsetGetter.scala:101)
                    at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply(KafkaOffsetGetter.scala:87)
                    at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply(KafkaOffsetGetter.scala:87)
                    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                    at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
                    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
            

            jdk版本:jdk1.8.0_181
            kafka 版本:kafka_2.12-2.1.1
            zookeeper版本:zookeeper-3.4.14
            KafkaOffsetMonitor启动命令

            java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
                 com.quantifind.kafka.offsetapp.OffsetGetterWeb \
                 --offsetStorage kafka \
                 --zk zkServer1:12181,zkServer2:12181 \
                 --port 8088 \
                 --refresh 10.seconds \
                 --retain 2.days
            

            一直在刷这个错误,web端也没监控到任何生产和消费,不知道是scala版本还是kafka版本不对

            • package com.quantifind.kafka.core
              import java.nio.ByteBuffer
              import com.quantifind.kafka.OffsetGetter.OffsetInfo
              import com.quantifind.kafka.OffsetGetter
              import com.quantifind.utils.ZkUtilsWrapper
              import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
              import kafka.common.{OffsetAndMetadata, TopicAndPartition}
              import kafka.consumer.{ConsumerConnector, KafkaStream}
              import kafka.message.MessageAndMetadata
              import kafka.utils.Logging
              import org.I0Itec.zkclient.ZkClient
              import scala.collection._
              import scala.concurrent.ExecutionContext.Implicits.global
              import scala.concurrent.Future
              import scala.util.control.NonFatal
              class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {
                import KafkaOffsetGetter._
                override val zkClient = theZkClient
                override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
                  try {
                    zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
                      case Some(bid) =>
                        val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
                        consumerOpt flatMap { consumer =>
                            val topicAndPartition = TopicAndPartition(topic, pid)
                            offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
                              val request =
                                OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
                              val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
                              OffsetInfo(group = group,
                                topic = topic,
                                partition = pid,
                                offset = offsetMetaData.offset,
                                logSize = logSize,
                                owner = Some("NA"))
                            }
                        }
                      case None =>
                        error("No broker for partition %s - %s".format(topic, pid))
                        None
                    }
                  } catch {
                    case NonFatal(t) =>
                      error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
                      None
                  }
                }
                override def getGroups: Seq[String] = {
                  topicAndGroups.groupBy(_.group).keySet.toSeq
                }
                override def getTopicList(group: String): List[String] = {
                  topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
                }
                override def getTopicMap: Map[String, scala.Seq[String]] = {
                  topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
                }
                override def getActiveTopicMap: Map[String, Seq[String]] = {
                  getTopicMap
                }
              }
              object KafkaOffsetGetter extends Logging {
                val ConsumerOffsetTopic = "__consumer_offsets"
                val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
                val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()
                def startOffsetListener(consumerConnector: ConsumerConnector) = {
                  Future {
                    try {
                      logger.info("Staring Kafka offset topic listener")
                      val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
                        .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
                        .get(ConsumerOffsetTopic).map(_.head) match {
                        case Some(s) => s
                        case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
                      }
                      val it = offsetMsgStream.iterator()
                      while (true) {
                        try {
                          val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
                          val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
                          val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
                          info("Processed commit message: " + commitKey + " => " + commitValue)
                          offsetMap += (commitKey -> commitValue)
                          topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
                          info(s"topicAndGroups = $topicAndGroups")
                        } catch {
                          case e: RuntimeException =>
                            // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
                            warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
                        }
                      }
                    } catch {
                      case e: Throwable =>
                        fatal("Offset topic listener aborted dur to unexpected exception", e)
                        System.exit(1)
                    }
                  }
                }
                // massive code stealing from kafka.server.OffsetManager
                import java.nio.ByteBuffer
                import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
                import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}
                private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
                private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
                                                                     new Field("topic", STRING),
                                                                     new Field("partition", INT32))
                private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
                private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
                private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
                private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
                                                                       new Field("metadata", STRING, "Associated metadata.", ""),
                                                                       new Field("timestamp", INT64))
                private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
                                                                       new Field("metadata", STRING, "Associated metadata.", ""),
                                                                       new Field("commit_timestamp", INT64),
                                                                       new Field("expire_timestamp", INT64))
                private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
                                                                       new Field("metadata", STRING, "Associated metadata.", ""),
                                                                       new Field("commit_timestamp", INT64))
                private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
                                                                       new Field("leader_epoch", INT32),
                                                                       new Field("metadata", STRING, "Associated metadata.", ""),
                                                                       new Field("commit_timestamp", INT64))
                private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
                private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
                private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
                private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
                private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
                private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
                private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
                private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
                private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
                private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
                private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
                private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
                private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
                // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
                // map of versions to schemas
                private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
                                                 1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
                  2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
                  3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))
                private def schemaFor(version: Int) = {
                  val schemaOpt = OFFSET_SCHEMAS.get(version)
                  schemaOpt match {
                    case Some(schema) => schema
                    case _ => throw new RuntimeException("Unknown offset schema version " + version)
                  }
                }
                case class MessageValueStructAndVersion(value: Struct, version: Short)
                case class TopicAndGroup(topic: String, group: String)
                case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
                  def this(group: String, topic: String, partition: Int) =
                    this(group, new TopicAndPartition(topic, partition))
                  override def toString =
                    "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
                }
                /**
                 * Decodes the offset messages' key
                 *
                 * @param buffer input byte-buffer
                 * @return an GroupTopicPartition object
                 */
                private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
                  val version = buffer.getShort()
                  val keySchema = schemaFor(version).keySchema
                  val key = keySchema.read(buffer).asInstanceOf[Struct]
                  val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
                  val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
                  val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
                  GroupTopicPartition(group, TopicAndPartition(topic, partition))
                }
                /**
                 * Decodes the offset messages' payload and retrieves offset and metadata from it
                 *
                 * @param buffer input byte-buffer
                 * @return an offset-metadata object from the message
                 */
                private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
                  val structAndVersion = readMessageValueStruct(buffer)
                  if (structAndVersion.value == null) { // tombstone
                    null
                  } else {
                    if (structAndVersion.version == 0) {
                      val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
                      val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
                      val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
                      OffsetAndMetadata(offset, metadata, timestamp)
                    } else if (structAndVersion.version == 1) {
                      val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
                      val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
                      val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                      // not supported in 0.8.2
                      // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                      OffsetAndMetadata(offset, metadata, commitTimestamp)
                    } else if (structAndVersion.version == 2) {
                      val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
                      val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
                      val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
                      // not supported in 0.8.2
                      // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                      OffsetAndMetadata(offset, metadata, commitTimestamp)
                    }else if(structAndVersion.version == 3){
                      val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
                      val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
                      val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
                      // not supported in 0.8.2
                      // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                      OffsetAndMetadata(offset, metadata, commitTimestamp)
                    } else {
                      throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
                    }
                  }
                }
                private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
                  if(buffer == null) { // tombstone
                    MessageValueStructAndVersion(null, -1)
                  } else {
                    val version = buffer.getShort()
                    val valueSchema = schemaFor(version).valueSchema
                    val value = valueSchema.read(buffer).asInstanceOf[Struct]
                    MessageValueStructAndVersion(value, version)
                  }
                }
              }
              

              改下源码

                2019-07-16 17:30:21 WARN  KafkaOffsetGetter$:89 - Failed to process one of the c
                ommit message due to exception. The 'bad' message will be skipped
                java.lang.RuntimeException: Unknown offset schema version 3
                

                请问这个错误是什么意思

                • package com.quantifind.kafka.core
                  import java.nio.ByteBuffer
                  import com.quantifind.kafka.OffsetGetter.OffsetInfo
                  import com.quantifind.kafka.OffsetGetter
                  import com.quantifind.utils.ZkUtilsWrapper
                  import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
                  import kafka.common.{OffsetAndMetadata, TopicAndPartition}
                  import kafka.consumer.{ConsumerConnector, KafkaStream}
                  import kafka.message.MessageAndMetadata
                  import kafka.utils.Logging
                  import org.I0Itec.zkclient.ZkClient
                  import scala.collection._
                  import scala.concurrent.ExecutionContext.Implicits.global
                  import scala.concurrent.Future
                  import scala.util.control.NonFatal
                  class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {
                    import KafkaOffsetGetter._
                    override val zkClient = theZkClient
                    override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
                      try {
                        zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
                          case Some(bid) =>
                            val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
                            consumerOpt flatMap { consumer =>
                                val topicAndPartition = TopicAndPartition(topic, pid)
                                offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
                                  val request =
                                    OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
                                  val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
                                  OffsetInfo(group = group,
                                    topic = topic,
                                    partition = pid,
                                    offset = offsetMetaData.offset,
                                    logSize = logSize,
                                    owner = Some("NA"))
                                }
                            }
                          case None =>
                            error("No broker for partition %s - %s".format(topic, pid))
                            None
                        }
                      } catch {
                        case NonFatal(t) =>
                          error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
                          None
                      }
                    }
                    override def getGroups: Seq[String] = {
                      topicAndGroups.groupBy(_.group).keySet.toSeq
                    }
                    override def getTopicList(group: String): List[String] = {
                      topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
                    }
                    override def getTopicMap: Map[String, scala.Seq[String]] = {
                      topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
                    }
                    override def getActiveTopicMap: Map[String, Seq[String]] = {
                      getTopicMap
                    }
                  }
                  object KafkaOffsetGetter extends Logging {
                    val ConsumerOffsetTopic = "__consumer_offsets"
                    val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
                    val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()
                    def startOffsetListener(consumerConnector: ConsumerConnector) = {
                      Future {
                        try {
                          logger.info("Staring Kafka offset topic listener")
                          val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
                            .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
                            .get(ConsumerOffsetTopic).map(_.head) match {
                            case Some(s) => s
                            case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
                          }
                          val it = offsetMsgStream.iterator()
                          while (true) {
                            try {
                              val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
                              val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
                              val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
                              info("Processed commit message: " + commitKey + " => " + commitValue)
                              offsetMap += (commitKey -> commitValue)
                              topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
                              info(s"topicAndGroups = $topicAndGroups")
                            } catch {
                              case e: RuntimeException =>
                                // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
                                warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
                            }
                          }
                        } catch {
                          case e: Throwable =>
                            fatal("Offset topic listener aborted dur to unexpected exception", e)
                            System.exit(1)
                        }
                      }
                    }
                    // massive code stealing from kafka.server.OffsetManager
                    import java.nio.ByteBuffer
                    import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
                    import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}
                    private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
                    private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
                                                                         new Field("topic", STRING),
                                                                         new Field("partition", INT32))
                    private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
                    private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
                    private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
                    private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
                                                                           new Field("metadata", STRING, "Associated metadata.", ""),
                                                                           new Field("timestamp", INT64))
                    private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
                                                                           new Field("metadata", STRING, "Associated metadata.", ""),
                                                                           new Field("commit_timestamp", INT64),
                                                                           new Field("expire_timestamp", INT64))
                    private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
                                                                           new Field("metadata", STRING, "Associated metadata.", ""),
                                                                           new Field("commit_timestamp", INT64))
                    private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
                                                                           new Field("leader_epoch", INT32),
                                                                           new Field("metadata", STRING, "Associated metadata.", ""),
                                                                           new Field("commit_timestamp", INT64))
                    private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
                    private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
                    private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
                    private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
                    private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
                    private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
                    private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
                    private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
                    private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
                    private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
                    private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
                    private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
                    private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
                    // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
                    // map of versions to schemas
                    private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
                                                     1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
                      2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
                      3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))
                    private def schemaFor(version: Int) = {
                      val schemaOpt = OFFSET_SCHEMAS.get(version)
                      schemaOpt match {
                        case Some(schema) => schema
                        case _ => throw new RuntimeException("Unknown offset schema version " + version)
                      }
                    }
                    case class MessageValueStructAndVersion(value: Struct, version: Short)
                    case class TopicAndGroup(topic: String, group: String)
                    case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
                      def this(group: String, topic: String, partition: Int) =
                        this(group, new TopicAndPartition(topic, partition))
                      override def toString =
                        "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
                    }
                    /**
                     * Decodes the offset messages' key
                     *
                     * @param buffer input byte-buffer
                     * @return an GroupTopicPartition object
                     */
                    private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
                      val version = buffer.getShort()
                      val keySchema = schemaFor(version).keySchema
                      val key = keySchema.read(buffer).asInstanceOf[Struct]
                      val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
                      val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
                      val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
                      GroupTopicPartition(group, TopicAndPartition(topic, partition))
                    }
                    /**
                     * Decodes the offset messages' payload and retrieves offset and metadata from it
                     *
                     * @param buffer input byte-buffer
                     * @return an offset-metadata object from the message
                     */
                    private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
                      val structAndVersion = readMessageValueStruct(buffer)
                      if (structAndVersion.value == null) { // tombstone
                        null
                      } else {
                        if (structAndVersion.version == 0) {
                          val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
                          val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
                          val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
                          OffsetAndMetadata(offset, metadata, timestamp)
                        } else if (structAndVersion.version == 1) {
                          val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
                          val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
                          val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                          // not supported in 0.8.2
                          // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                          OffsetAndMetadata(offset, metadata, commitTimestamp)
                        } else if (structAndVersion.version == 2) {
                          val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
                          val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
                          val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
                          // not supported in 0.8.2
                          // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                          OffsetAndMetadata(offset, metadata, commitTimestamp)
                        }else if(structAndVersion.version == 3){
                          val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
                          val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
                          val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
                          // not supported in 0.8.2
                          // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                          OffsetAndMetadata(offset, metadata, commitTimestamp)
                        } else {
                          throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
                        }
                      }
                    }
                    private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
                      if(buffer == null) { // tombstone
                        MessageValueStructAndVersion(null, -1)
                      } else {
                        val version = buffer.getShort()
                        val valueSchema = schemaFor(version).valueSchema
                        val value = valueSchema.read(buffer).asInstanceOf[Struct]
                        MessageValueStructAndVersion(value, version)
                      }
                    }
                  }
                  

                    请问大佬,消费者端会维护一个offset吗?

                    • 我前面是说业务处理成功了,提交那步出了问题,这就没办法了,然后服务端不会更新offset,如果本地也有维护备份offset的话,下次消费就可以对比获取到的消息中的offset,选择处理未消费部分

                        • 好吧,我们生产用了4年,先提交offset,然后处理业务,如果业务重的,我就降低了并发,提交offset的数量控制了,保证即使项目崩溃也丢不了几条消息(项目直接崩溃4年来没遇到过,也就是说从没丢过消息),只要是正常的关闭,都不会有问题。

                            • 嗯嗯,我前面想的也有问题,如果消费者重试提交有问题的话,消费组应该会把这个消费端抛弃掉再平衡,大佬,你们有把卡夫卡用在涉及到资金的交易里面吗,比如支付类的,感觉这种交易错一次都会被投诉…

                                kafka 1.1.0 启动之后,打开页面,菜单能出来,页面没内容,点那个菜单都没反应,后台没报错,什么情况

                                你好,我的kafka版本 kafka_2.12-1.1.0 ,按照你的流程部署,点击:

                                Kafka Cluster Visualization
                                Loading ...一直这样(我昨天做的SASL认证,是不是需要额外处理什么)
                                topic list 我点击了是正常的能看到所有的 topic
                                其他的点击没啥反应不显示东西
                                另外我看了下日志报了 空指针异常:

                                INFO  ConsumerFetcherManager:68 - [ConsumerFetcherManager-1541677023239] Added fetcher for partitions ArrayBuffer()
                                2018-11-08 19:38:51 WARN  ConsumerFetcherManager$LeaderFinderThread:89 - [KafkaOffsetMonitor-1541677023090_server1.hzguode.com-1541677023185-3dacb5b0-leader-finder-thread], Failed to find leader for Set([__consumer_offsets,16], [__consumer_offsets,2], [__consumer_offsets,39], [__consumer_offsets,15], [__consumer_offsets,33], [__consumer_offsets,38], [__consumer_offsets,9], [__consumer_offsets,7], [__consumer_offsets,31], [__consumer_offsets,48], [__consumer_offsets,24], [__consumer_offsets,19], [__consumer_offsets,11], [__consumer_offsets,22], [__consumer_offsets,26], [__consumer_offsets,17], [__consumer_offsets,10], [__consumer_offsets,35], [__consumer_offsets,36], [__consumer_offsets,34], [__consumer_offsets,49], [__consumer_offsets,44], [__consumer_offsets,23], [__consumer_offsets,40], [__consumer_offsets,47], [__consumer_offsets,45], [__consumer_offsets,43], [__consumer_offsets,46], [__consumer_offsets,37], [__consumer_offsets,28], [__consumer_offsets,27], [__consumer_offsets,25], [__consumer_offsets,6], [__consumer_offsets,5], [__consumer_offsets,3], [__consumer_offsets,18], [__consumer_offsets,14], [__consumer_offsets,41], [__consumer_offsets,42], [__consumer_offsets,4], [__consumer_offsets,12], [__consumer_offsets,21], [__consumer_offsets,30], [__consumer_offsets,1], [__consumer_offsets,32], [__consumer_offsets,13], [__consumer_offsets,8], [__consumer_offsets,20], [__consumer_offsets,0], [__consumer_offsets,29])
                                java.lang.NullPointerException
                                        at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
                                        at kafka.cluster.Broker.connectionString(Broker.scala:62)
                                        at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
                                        at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
                                        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
                                        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
                                        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                                        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
                                        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
                                        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
                                        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
                                        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
                                        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
                                

                                麻烦大神抽空帮忙给看下 搞了一上午 头大....

                                进入某个topic后Unable to find Active Consumers,其实有个消费端,测试好多条发送消息和接受消息,在zk上也有consumer信息。
                                在kafka-manager上能看得到,kafkaMonitor上没有,这是怎么回事呀

                                请问要监控的kafka有ssl证书的话,KafkaOffsetMonitor可以正常监控吗?还是启动方式会不一样呢?谢谢你!

                                你好,我按照你的方法运行了kafkaoffsetmonitor,但是发现界面没内容,而服务器上面的stdout.log日志打印了很多,kafka版本是2.11-0.10.2.0,下面是我的运行命令:
                                java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --zk zk1:2181,zk2:2181,zk3:2181/log/kafka --port 8090 --refresh 10.seconds --retain 2.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log &