KafkaOffsetMonitor:监控消费者和延迟的队列

    原创
半兽人 发表于: 2015-03-10   最后更新时间: 2016-10-14  

一个小应用程序来监视kafka消费者的进度和它们的延迟的队列。

KafkaOffsetMonitor是用来实时监控Kafka集群中的consumer以及在队列中的位置(偏移量)。

你可以查看当前的消费者组,每个topic队列的所有partition的消费情况。可以很快地知道每个partition中的消息是否很快被消费以及相应的队列消息增长速度等信息。这些可以debug kafka的producer和consumer,你完全知道你的系统将会发生什么。

这个web管理平台保留的partition offset和consumer滞后的历史数据(具体数据保存多少天我们可以在启动的时候配置),所以你可以很轻易了解这几天consumer消费情况。

KafkaOffsetMonitor这款软件是用Scala代码编写的,消息等历史数据是保存在名为offsetapp.db数据库文件中,该数据库是SQLLite文件,非常的轻量级。虽然我们可以在启动KafkaOffsetMonitor程序的时候指定数据更新的频率和数据保存的时间,但是不建议更新很频繁,或者保存大量的数据,因为在KafkaOffsetMonitor图形展示的时候会出现图像展示过慢,或者是直接导致内存溢出了。

所有的关于消息的偏移量、kafka集群的数量等信息都是从Zookeeper中获取到的,日志大小是通过计算得到的。

消费者组列表

screenshot

消费组的topic列表

screenshot

图中参数含义解释如下:

topic:创建时topic名称
partition:分区编号
offset:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者
Created:该partition创建时间
Last Seen:消费状态刷新最新时间。

topic的历史位置

screenshot

Offset存储位置

kafka能灵活地管理offset,可以选择任意存储和格式来保存offset。KafkaOffsetMonitor目前支持以下流行的存储格式。

  • kafka0.8版本以前,offset默认存储在zookeeper中(基于Zookeeper)
  • kafka0.9版本以后,offset默认存储在内部的topic中(基于Kafka内部的topic)
  • Storm Kafka Spout(默认情况下基于Zookeeper)

KafkaOffsetMonitor每个运行的实例只能支持单一类型的存储格式。

下载

可以到github下载KafkaOffsetMonitor源码。

https://github.com/quantifind/KafkaOffsetMonitor

编译KafkaOffsetMonitor命令:

sbt/sbt assembly

不过不建议你自己去下载,因为编译的jar包里引入的都是外部的css和js,所以打开必须联网,都是国外的地址,你编译的时候还要修改js路径,我已经搞定了,你直接下载就好了。

百度云盘:https://pan.baidu.com/s/1kUZJrCV

启动

编译完之后,将会在KafkaOffsetMonitor根目录下生成一个类似KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar的jar文件。这个文件包含了所有的依赖,我们可以直接启动它:

java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --offsetStorage kafka \
     --zk zk-server1,zk-server2 \
     --port 8080 \
     --refresh 10.seconds \
     --retain 2.days

启动方式2,创建脚本,因为您可能不是一个kafka集群。用脚本可以启动多个。

vim mobile_start_en.sh
        nohup java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb 
       --offsetStorage kafka
       --zk 127.0.0.1:2181  
       --port 8080      
       --refresh 10.seconds      
       --retain 2.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log &

各个参数的含义:

  • offsetStorage:有效的选项是"zookeeper","kafka","storm"。0.9版本以后,offset存储的位置在kafka。
  • zk: zookeeper的地址
  • prot 端口号
  • refresh 刷新频率,更新到DB。
  • retain 保留DB的时间
  • dbName 在哪里存储记录(默认'offsetapp')


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka监控
下一条: Kafka Manager

  • 我在操作消费kafka队列时出现 numRecords must not be negative 该如何解决?

    • java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
          at scala.Predef$.require(Predef.scala:233)
          at org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
          at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2$$anonfun$apply$29.apply(DStream.scala:934)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2$$anonfun$apply$29.apply(DStream.scala:933)
          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
          at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
          at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
          at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
          at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:933)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$2.apply(DStream.scala:909)
          at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
          at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
          at org.apache.spark.SparkContext.withScope(SparkContext.scala:716)
          at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
          at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:909)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$1.apply(DStream.scala:903)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$slice$1.apply(DStream.scala:903)
          at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
          at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
          at org.apache.spark.SparkContext.withScope(SparkContext.scala:716)
          at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
          at org.apache.spark.streaming.dstream.DStream.slice(DStream.scala:902)
          at org.apache.spark.streaming.dstream.WindowedDStream.compute(WindowedDStream.scala:65)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.UnionDStream$$anonfun$compute$1.apply(UnionDStream.scala:43)
          at org.apache.spark.streaming.dstream.UnionDStream$$anonfun$compute$1.apply(UnionDStream.scala:43)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
          at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
          at org.apache.spark.streaming.dstream.UnionDStream.compute(UnionDStream.scala:43)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
          at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
          at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:351)
          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:350)
          at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:425)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:345)
          at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:343)
          at scala.Option.orElse(Option.scala:257)
          at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:340)
          at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
          at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
          at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
          at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
          at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
          at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
          at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
          at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
          at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
          at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
          at scala.util.Try$.apply(Try.scala:161)
          at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
          at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
          at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
          at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
          at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      20/01/02 11:17:02 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
      

        1、我用的kafka自带的zookeeper,大概过了一天,zookeeper进程就停掉了。找不到原因,烦请解答。

        2、kafka开启了SASL和ACL认证,我在执行监控jar包前:export了环境变量:-Djava.security.auth.login.config=/home/hadoop/kafka/kafka_2.11-1.0.1/config/kafka_client_jaas.conf

        现在报错:

        2019-12-24 14:50:45 INFO  ConsumerFetcherManager:68 - [ConsumerFetcherManager-1577170211362] Added fetcher for partitions ArrayBuffer()
        ^C2019-12-24 14:50:46 INFO  ContextHandler:843 - stopped o.e.j.s.ServletContextHandler{/,jar:file:/home/hadoop/kafka/KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar!/offsetapp}
        2019-12-24 14:50:46 WARN  ConsumerFetcherManager$LeaderFinderThread:89 - [KafkaOffsetMonitor-1577170211257_UIP-01-1577170211345-170696dd-leader-finder-thread], Failed to find leader for Set([__consumer_offsets,16], [__consumer_offsets,2], [__consumer_offsets,39], [__consumer_offsets,15], [__consumer_offsets,33], [__consumer_offsets,38], [__consumer_offsets,9], [__consumer_offsets,7], [__consumer_offsets,31], [__consumer_offsets,48], [__consumer_offsets,24], [__consumer_offsets,19], [__consumer_offsets,11], [__consumer_offsets,22], [__consumer_offsets,26], [__consumer_offsets,17], [__consumer_offsets,10], [__consumer_offsets,35], [__consumer_offsets,36], [__consumer_offsets,34], [__consumer_offsets,49], [__consumer_offsets,44], [__consumer_offsets,23], [__consumer_offsets,40], [__consumer_offsets,47], [__consumer_offsets,45], [__consumer_offsets,43], [__consumer_offsets,46], [__consumer_offsets,37], [__consumer_offsets,28], [__consumer_offsets,27], [__consumer_offsets,25], [__consumer_offsets,6], [__consumer_offsets,5], [__consumer_offsets,3], [__consumer_offsets,18], [__consumer_offsets,14], [__consumer_offsets,41], [__consumer_offsets,42], [__consumer_offsets,4], [__consumer_offsets,12], [__consumer_offsets,21], [__consumer_offsets,30], [__consumer_offsets,1], [__consumer_offsets,32], [__consumer_offsets,13], [__consumer_offsets,8], [__consumer_offsets,20], [__consumer_offsets,0], [__consumer_offsets,29])
        java.lang.NullPointerException
                at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
                at kafka.cluster.Broker.connectionString(Broker.scala:62)
                at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
                at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
                at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
                at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
                at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
                at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
                at scala.collection.AbstractTraversable.map(Traversable.scala:105)
                at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
                at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
                at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
        

        以上,烦请解答。感谢

        你好,github上下载的源码按照你说的改完了那一个类,怎么用sbt/sbt assembly命令重新打包成jar文件啊,你那里有修改过的jar吗

        大佬!kafkaMonitor的数据只能看到topic,其他的都没有,cmd命令能查看到,这是怎么回事

        • WARN  KafkaOffsetGetter$:89 - Failed to process one of the commit message due to exception. The 'bad' message will be skipped
          java.lang.RuntimeException: Unknown offset schema version 2
                  at com.quantifind.kafka.core.KafkaOffsetGetter$.schemaFor(KafkaOffsetGetter.scala:162)
                  at com.quantifind.kafka.core.KafkaOffsetGetter$.com$quantifind$kafka$core$KafkaOffsetGetter$$readMessageKey(KafkaOffsetGetter.scala:187)
                  at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply$mcV$sp(KafkaOffsetGetter.scala:100)
                  at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply(KafkaOffsetGetter.scala:87)
                  at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply(KafkaOffsetGetter.scala:87)
                  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                  at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
                  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
          

          用的是2.2.0的版本,消费者启动后出现这个错误,用zk储存位置启动时,出现消费者组,但不是我创建的,kafka储存位置启动时还是什么都没有。zk和kafka一直显示的只有topic

            • package com.quantifind.kafka.core
              import java.nio.ByteBuffer
              import com.quantifind.kafka.OffsetGetter.OffsetInfo
              import com.quantifind.kafka.OffsetGetter
              import com.quantifind.utils.ZkUtilsWrapper
              import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
              import kafka.common.{OffsetAndMetadata, TopicAndPartition}
              import kafka.consumer.{ConsumerConnector, KafkaStream}
              import kafka.message.MessageAndMetadata
              import kafka.utils.Logging
              import org.I0Itec.zkclient.ZkClient
              import scala.collection._
              import scala.concurrent.ExecutionContext.Implicits.global
              import scala.concurrent.Future
              import scala.util.control.NonFatal
              class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {
                import KafkaOffsetGetter._
                override val zkClient = theZkClient
                override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
                  try {
                    zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
                      case Some(bid) =>
                        val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
                        consumerOpt flatMap { consumer =>
                            val topicAndPartition = TopicAndPartition(topic, pid)
                            offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
                              val request =
                                OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
                              val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
                              OffsetInfo(group = group,
                                topic = topic,
                                partition = pid,
                                offset = offsetMetaData.offset,
                                logSize = logSize,
                                owner = Some("NA"))
                            }
                        }
                      case None =>
                        error("No broker for partition %s - %s".format(topic, pid))
                        None
                    }
                  } catch {
                    case NonFatal(t) =>
                      error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
                      None
                  }
                }
                override def getGroups: Seq[String] = {
                  topicAndGroups.groupBy(_.group).keySet.toSeq
                }
                override def getTopicList(group: String): List[String] = {
                  topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
                }
                override def getTopicMap: Map[String, scala.Seq[String]] = {
                  topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
                }
                override def getActiveTopicMap: Map[String, Seq[String]] = {
                  getTopicMap
                }
              }
              object KafkaOffsetGetter extends Logging {
                val ConsumerOffsetTopic = "__consumer_offsets"
                val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
                val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()
                def startOffsetListener(consumerConnector: ConsumerConnector) = {
                  Future {
                    try {
                      logger.info("Staring Kafka offset topic listener")
                      val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
                        .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
                        .get(ConsumerOffsetTopic).map(_.head) match {
                        case Some(s) => s
                        case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
                      }
                      val it = offsetMsgStream.iterator()
                      while (true) {
                        try {
                          val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
                          val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
                          val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
                          info("Processed commit message: " + commitKey + " => " + commitValue)
                          offsetMap += (commitKey -> commitValue)
                          topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
                          info(s"topicAndGroups = $topicAndGroups")
                        } catch {
                          case e: RuntimeException =>
                            // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
                            warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
                        }
                      }
                    } catch {
                      case e: Throwable =>
                        fatal("Offset topic listener aborted dur to unexpected exception", e)
                        System.exit(1)
                    }
                  }
                }
                // massive code stealing from kafka.server.OffsetManager
                import java.nio.ByteBuffer
                import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
                import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}
                private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
                private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
                                                                     new Field("topic", STRING),
                                                                     new Field("partition", INT32))
                private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
                private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
                private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
                private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
                                                                       new Field("metadata", STRING, "Associated metadata.", ""),
                                                                       new Field("timestamp", INT64))
                private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
                                                                       new Field("metadata", STRING, "Associated metadata.", ""),
                                                                       new Field("commit_timestamp", INT64),
                                                                       new Field("expire_timestamp", INT64))
                private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
                                                                       new Field("metadata", STRING, "Associated metadata.", ""),
                                                                       new Field("commit_timestamp", INT64))
                private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
                                                                       new Field("leader_epoch", INT32),
                                                                       new Field("metadata", STRING, "Associated metadata.", ""),
                                                                       new Field("commit_timestamp", INT64))
                private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
                private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
                private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
                private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
                private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
                private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
                private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
                private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
                private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
                private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
                private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
                private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
                private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
                // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
                // map of versions to schemas
                private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
                                                 1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
                  2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
                  3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))
                private def schemaFor(version: Int) = {
                  val schemaOpt = OFFSET_SCHEMAS.get(version)
                  schemaOpt match {
                    case Some(schema) => schema
                    case _ => throw new RuntimeException("Unknown offset schema version " + version)
                  }
                }
                case class MessageValueStructAndVersion(value: Struct, version: Short)
                case class TopicAndGroup(topic: String, group: String)
                case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
                  def this(group: String, topic: String, partition: Int) =
                    this(group, new TopicAndPartition(topic, partition))
                  override def toString =
                    "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
                }
                /**
                 * Decodes the offset messages' key
                 *
                 * @param buffer input byte-buffer
                 * @return an GroupTopicPartition object
                 */
                private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
                  val version = buffer.getShort()
                  val keySchema = schemaFor(version).keySchema
                  val key = keySchema.read(buffer).asInstanceOf[Struct]
                  val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
                  val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
                  val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
                  GroupTopicPartition(group, TopicAndPartition(topic, partition))
                }
                /**
                 * Decodes the offset messages' payload and retrieves offset and metadata from it
                 *
                 * @param buffer input byte-buffer
                 * @return an offset-metadata object from the message
                 */
                private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
                  val structAndVersion = readMessageValueStruct(buffer)
                  if (structAndVersion.value == null) { // tombstone
                    null
                  } else {
                    if (structAndVersion.version == 0) {
                      val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
                      val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
                      val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
                      OffsetAndMetadata(offset, metadata, timestamp)
                    } else if (structAndVersion.version == 1) {
                      val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
                      val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
                      val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                      // not supported in 0.8.2
                      // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                      OffsetAndMetadata(offset, metadata, commitTimestamp)
                    } else if (structAndVersion.version == 2) {
                      val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
                      val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
                      val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
                      // not supported in 0.8.2
                      // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                      OffsetAndMetadata(offset, metadata, commitTimestamp)
                    }else if(structAndVersion.version == 3){
                      val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
                      val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
                      val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
                      // not supported in 0.8.2
                      // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                      OffsetAndMetadata(offset, metadata, commitTimestamp)
                    } else {
                      throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
                    }
                  }
                }
                private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
                  if(buffer == null) { // tombstone
                    MessageValueStructAndVersion(null, -1)
                  } else {
                    val version = buffer.getShort()
                    val valueSchema = schemaFor(version).valueSchema
                    val value = valueSchema.read(buffer).asInstanceOf[Struct]
                    MessageValueStructAndVersion(value, version)
                  }
                }
              }
              

              改下源码,这个类的代码改成这样就好了

                2019-08-16 10:02:27 WARN  KafkaOffsetGetter$:89 - Failed to process one of the commit message due to exception. The 'bad' message will be skipped
                java.lang.RuntimeException: Unknown offset schema version 3
                        at com.quantifind.kafka.core.KafkaOffsetGetter$.schemaFor(KafkaOffsetGetter.scala:162)
                        at com.quantifind.kafka.core.KafkaOffsetGetter$.readMessageValueStruct(KafkaOffsetGetter.scala:234)
                        at com.quantifind.kafka.core.KafkaOffsetGetter$.com$quantifind$kafka$core$KafkaOffsetGetter$$readMessageValue(KafkaOffsetGetter.scala:204)
                        at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply$mcV$sp(KafkaOffsetGetter.scala:101)
                        at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply(KafkaOffsetGetter.scala:87)
                        at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$startOffsetListener$1.apply(KafkaOffsetGetter.scala:87)
                        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                        at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
                        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
                

                jdk版本:jdk1.8.0_181
                kafka 版本:kafka_2.12-2.1.1
                zookeeper版本:zookeeper-3.4.14
                KafkaOffsetMonitor启动命令

                java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
                     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
                     --offsetStorage kafka \
                     --zk zkServer1:12181,zkServer2:12181 \
                     --port 8088 \
                     --refresh 10.seconds \
                     --retain 2.days
                

                一直在刷这个错误,web端也没监控到任何生产和消费,不知道是scala版本还是kafka版本不对

                • package com.quantifind.kafka.core
                  import java.nio.ByteBuffer
                  import com.quantifind.kafka.OffsetGetter.OffsetInfo
                  import com.quantifind.kafka.OffsetGetter
                  import com.quantifind.utils.ZkUtilsWrapper
                  import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
                  import kafka.common.{OffsetAndMetadata, TopicAndPartition}
                  import kafka.consumer.{ConsumerConnector, KafkaStream}
                  import kafka.message.MessageAndMetadata
                  import kafka.utils.Logging
                  import org.I0Itec.zkclient.ZkClient
                  import scala.collection._
                  import scala.concurrent.ExecutionContext.Implicits.global
                  import scala.concurrent.Future
                  import scala.util.control.NonFatal
                  class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {
                    import KafkaOffsetGetter._
                    override val zkClient = theZkClient
                    override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
                      try {
                        zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
                          case Some(bid) =>
                            val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
                            consumerOpt flatMap { consumer =>
                                val topicAndPartition = TopicAndPartition(topic, pid)
                                offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
                                  val request =
                                    OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
                                  val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
                                  OffsetInfo(group = group,
                                    topic = topic,
                                    partition = pid,
                                    offset = offsetMetaData.offset,
                                    logSize = logSize,
                                    owner = Some("NA"))
                                }
                            }
                          case None =>
                            error("No broker for partition %s - %s".format(topic, pid))
                            None
                        }
                      } catch {
                        case NonFatal(t) =>
                          error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
                          None
                      }
                    }
                    override def getGroups: Seq[String] = {
                      topicAndGroups.groupBy(_.group).keySet.toSeq
                    }
                    override def getTopicList(group: String): List[String] = {
                      topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
                    }
                    override def getTopicMap: Map[String, scala.Seq[String]] = {
                      topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
                    }
                    override def getActiveTopicMap: Map[String, Seq[String]] = {
                      getTopicMap
                    }
                  }
                  object KafkaOffsetGetter extends Logging {
                    val ConsumerOffsetTopic = "__consumer_offsets"
                    val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
                    val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()
                    def startOffsetListener(consumerConnector: ConsumerConnector) = {
                      Future {
                        try {
                          logger.info("Staring Kafka offset topic listener")
                          val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
                            .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
                            .get(ConsumerOffsetTopic).map(_.head) match {
                            case Some(s) => s
                            case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
                          }
                          val it = offsetMsgStream.iterator()
                          while (true) {
                            try {
                              val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
                              val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
                              val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
                              info("Processed commit message: " + commitKey + " => " + commitValue)
                              offsetMap += (commitKey -> commitValue)
                              topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
                              info(s"topicAndGroups = $topicAndGroups")
                            } catch {
                              case e: RuntimeException =>
                                // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
                                warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
                            }
                          }
                        } catch {
                          case e: Throwable =>
                            fatal("Offset topic listener aborted dur to unexpected exception", e)
                            System.exit(1)
                        }
                      }
                    }
                    // massive code stealing from kafka.server.OffsetManager
                    import java.nio.ByteBuffer
                    import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
                    import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}
                    private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
                    private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
                                                                         new Field("topic", STRING),
                                                                         new Field("partition", INT32))
                    private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
                    private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
                    private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
                    private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
                                                                           new Field("metadata", STRING, "Associated metadata.", ""),
                                                                           new Field("timestamp", INT64))
                    private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
                                                                           new Field("metadata", STRING, "Associated metadata.", ""),
                                                                           new Field("commit_timestamp", INT64),
                                                                           new Field("expire_timestamp", INT64))
                    private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
                                                                           new Field("metadata", STRING, "Associated metadata.", ""),
                                                                           new Field("commit_timestamp", INT64))
                    private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
                                                                           new Field("leader_epoch", INT32),
                                                                           new Field("metadata", STRING, "Associated metadata.", ""),
                                                                           new Field("commit_timestamp", INT64))
                    private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
                    private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
                    private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
                    private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
                    private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
                    private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
                    private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
                    private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
                    private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
                    private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
                    private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
                    private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
                    private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
                    // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
                    // map of versions to schemas
                    private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
                                                     1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
                      2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
                      3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))
                    private def schemaFor(version: Int) = {
                      val schemaOpt = OFFSET_SCHEMAS.get(version)
                      schemaOpt match {
                        case Some(schema) => schema
                        case _ => throw new RuntimeException("Unknown offset schema version " + version)
                      }
                    }
                    case class MessageValueStructAndVersion(value: Struct, version: Short)
                    case class TopicAndGroup(topic: String, group: String)
                    case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
                      def this(group: String, topic: String, partition: Int) =
                        this(group, new TopicAndPartition(topic, partition))
                      override def toString =
                        "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
                    }
                    /**
                     * Decodes the offset messages' key
                     *
                     * @param buffer input byte-buffer
                     * @return an GroupTopicPartition object
                     */
                    private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
                      val version = buffer.getShort()
                      val keySchema = schemaFor(version).keySchema
                      val key = keySchema.read(buffer).asInstanceOf[Struct]
                      val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
                      val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
                      val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
                      GroupTopicPartition(group, TopicAndPartition(topic, partition))
                    }
                    /**
                     * Decodes the offset messages' payload and retrieves offset and metadata from it
                     *
                     * @param buffer input byte-buffer
                     * @return an offset-metadata object from the message
                     */
                    private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
                      val structAndVersion = readMessageValueStruct(buffer)
                      if (structAndVersion.value == null) { // tombstone
                        null
                      } else {
                        if (structAndVersion.version == 0) {
                          val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
                          val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
                          val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
                          OffsetAndMetadata(offset, metadata, timestamp)
                        } else if (structAndVersion.version == 1) {
                          val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
                          val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
                          val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                          // not supported in 0.8.2
                          // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                          OffsetAndMetadata(offset, metadata, commitTimestamp)
                        } else if (structAndVersion.version == 2) {
                          val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
                          val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
                          val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
                          // not supported in 0.8.2
                          // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                          OffsetAndMetadata(offset, metadata, commitTimestamp)
                        }else if(structAndVersion.version == 3){
                          val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
                          val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
                          val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
                          // not supported in 0.8.2
                          // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                          OffsetAndMetadata(offset, metadata, commitTimestamp)
                        } else {
                          throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
                        }
                      }
                    }
                    private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
                      if(buffer == null) { // tombstone
                        MessageValueStructAndVersion(null, -1)
                      } else {
                        val version = buffer.getShort()
                        val valueSchema = schemaFor(version).valueSchema
                        val value = valueSchema.read(buffer).asInstanceOf[Struct]
                        MessageValueStructAndVersion(value, version)
                      }
                    }
                  }
                  

                  改下源码

                    2019-07-16 17:30:21 WARN  KafkaOffsetGetter$:89 - Failed to process one of the c
                    ommit message due to exception. The 'bad' message will be skipped
                    java.lang.RuntimeException: Unknown offset schema version 3
                    

                    请问这个错误是什么意思

                    • package com.quantifind.kafka.core
                      import java.nio.ByteBuffer
                      import com.quantifind.kafka.OffsetGetter.OffsetInfo
                      import com.quantifind.kafka.OffsetGetter
                      import com.quantifind.utils.ZkUtilsWrapper
                      import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
                      import kafka.common.{OffsetAndMetadata, TopicAndPartition}
                      import kafka.consumer.{ConsumerConnector, KafkaStream}
                      import kafka.message.MessageAndMetadata
                      import kafka.utils.Logging
                      import org.I0Itec.zkclient.ZkClient
                      import scala.collection._
                      import scala.concurrent.ExecutionContext.Implicits.global
                      import scala.concurrent.Future
                      import scala.util.control.NonFatal
                      class KafkaOffsetGetter(theZkClient: ZkClient, zkUtils: ZkUtilsWrapper = new ZkUtilsWrapper) extends OffsetGetter {
                        import KafkaOffsetGetter._
                        override val zkClient = theZkClient
                        override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
                          try {
                            zkUtils.getLeaderForPartition(zkClient, topic, pid) match {
                              case Some(bid) =>
                                val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(bid))
                                consumerOpt flatMap { consumer =>
                                    val topicAndPartition = TopicAndPartition(topic, pid)
                                    offsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData =>
                                      val request =
                                        OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
                                      val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
                                      OffsetInfo(group = group,
                                        topic = topic,
                                        partition = pid,
                                        offset = offsetMetaData.offset,
                                        logSize = logSize,
                                        owner = Some("NA"))
                                    }
                                }
                              case None =>
                                error("No broker for partition %s - %s".format(topic, pid))
                                None
                            }
                          } catch {
                            case NonFatal(t) =>
                              error(s"Could not parse partition info. group: [$group] topic: [$topic]", t)
                              None
                          }
                        }
                        override def getGroups: Seq[String] = {
                          topicAndGroups.groupBy(_.group).keySet.toSeq
                        }
                        override def getTopicList(group: String): List[String] = {
                          topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList
                        }
                        override def getTopicMap: Map[String, scala.Seq[String]] = {
                          topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq)
                        }
                        override def getActiveTopicMap: Map[String, Seq[String]] = {
                          getTopicMap
                        }
                      }
                      object KafkaOffsetGetter extends Logging {
                        val ConsumerOffsetTopic = "__consumer_offsets"
                        val offsetMap: mutable.Map[GroupTopicPartition, OffsetAndMetadata] = mutable.HashMap()
                        val topicAndGroups: mutable.Set[TopicAndGroup] = mutable.HashSet()
                        def startOffsetListener(consumerConnector: ConsumerConnector) = {
                          Future {
                            try {
                              logger.info("Staring Kafka offset topic listener")
                              val offsetMsgStream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector
                                .createMessageStreams(Map(ConsumerOffsetTopic -> 1))
                                .get(ConsumerOffsetTopic).map(_.head) match {
                                case Some(s) => s
                                case None => throw new IllegalStateException("Cannot create a consumer stream on offset topic ")
                              }
                              val it = offsetMsgStream.iterator()
                              while (true) {
                                try {
                                  val offsetMsg: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
                                  val commitKey: GroupTopicPartition = readMessageKey(ByteBuffer.wrap(offsetMsg.key()))
                                  val commitValue: OffsetAndMetadata = readMessageValue(ByteBuffer.wrap(offsetMsg.message()))
                                  info("Processed commit message: " + commitKey + " => " + commitValue)
                                  offsetMap += (commitKey -> commitValue)
                                  topicAndGroups += TopicAndGroup(commitKey.topicPartition.topic, commitKey.group)
                                  info(s"topicAndGroups = $topicAndGroups")
                                } catch {
                                  case e: RuntimeException =>
                                    // sometimes offsetMsg.key() || offsetMsg.message() throws NPE
                                    warn("Failed to process one of the commit message due to exception. The 'bad' message will be skipped", e)
                                }
                              }
                            } catch {
                              case e: Throwable =>
                                fatal("Offset topic listener aborted dur to unexpected exception", e)
                                System.exit(1)
                            }
                          }
                        }
                        // massive code stealing from kafka.server.OffsetManager
                        import java.nio.ByteBuffer
                        import org.apache.kafka.common.protocol.types.Type.{INT32, INT64, STRING}
                        import org.apache.kafka.common.protocol.types.{Field, Schema, Struct}
                        private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
                        private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
                                                                             new Field("topic", STRING),
                                                                             new Field("partition", INT32))
                        private val KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("group")
                        private val KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("topic")
                        private val KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA_V0.get("partition")
                        private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
                                                                               new Field("metadata", STRING, "Associated metadata.", ""),
                                                                               new Field("timestamp", INT64))
                        private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
                                                                               new Field("metadata", STRING, "Associated metadata.", ""),
                                                                               new Field("commit_timestamp", INT64),
                                                                               new Field("expire_timestamp", INT64))
                        private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
                                                                               new Field("metadata", STRING, "Associated metadata.", ""),
                                                                               new Field("commit_timestamp", INT64))
                        private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(new Field("offset", INT64),
                                                                               new Field("leader_epoch", INT32),
                                                                               new Field("metadata", STRING, "Associated metadata.", ""),
                                                                               new Field("commit_timestamp", INT64))
                        private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
                        private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
                        private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
                        private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
                        private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
                        private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
                        private val VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
                        private val VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
                        private val VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
                        private val VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
                        private val VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
                        private val VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
                        private val VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
                        // private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
                        // map of versions to schemas
                        private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
                                                         1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1),
                          2 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V2),
                          3 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V3))
                        private def schemaFor(version: Int) = {
                          val schemaOpt = OFFSET_SCHEMAS.get(version)
                          schemaOpt match {
                            case Some(schema) => schema
                            case _ => throw new RuntimeException("Unknown offset schema version " + version)
                          }
                        }
                        case class MessageValueStructAndVersion(value: Struct, version: Short)
                        case class TopicAndGroup(topic: String, group: String)
                        case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
                          def this(group: String, topic: String, partition: Int) =
                            this(group, new TopicAndPartition(topic, partition))
                          override def toString =
                            "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
                        }
                        /**
                         * Decodes the offset messages' key
                         *
                         * @param buffer input byte-buffer
                         * @return an GroupTopicPartition object
                         */
                        private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
                          val version = buffer.getShort()
                          val keySchema = schemaFor(version).keySchema
                          val key = keySchema.read(buffer).asInstanceOf[Struct]
                          val group = key.get(KEY_GROUP_FIELD).asInstanceOf[String]
                          val topic = key.get(KEY_TOPIC_FIELD).asInstanceOf[String]
                          val partition = key.get(KEY_PARTITION_FIELD).asInstanceOf[Int]
                          GroupTopicPartition(group, TopicAndPartition(topic, partition))
                        }
                        /**
                         * Decodes the offset messages' payload and retrieves offset and metadata from it
                         *
                         * @param buffer input byte-buffer
                         * @return an offset-metadata object from the message
                         */
                        private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
                          val structAndVersion = readMessageValueStruct(buffer)
                          if (structAndVersion.value == null) { // tombstone
                            null
                          } else {
                            if (structAndVersion.version == 0) {
                              val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
                              val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
                              val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
                              OffsetAndMetadata(offset, metadata, timestamp)
                            } else if (structAndVersion.version == 1) {
                              val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
                              val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V1).asInstanceOf[String]
                              val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                              // not supported in 0.8.2
                              // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                              OffsetAndMetadata(offset, metadata, commitTimestamp)
                            } else if (structAndVersion.version == 2) {
                              val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
                              val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V2).asInstanceOf[String]
                              val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
                              // not supported in 0.8.2
                              // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                              OffsetAndMetadata(offset, metadata, commitTimestamp)
                            }else if(structAndVersion.version == 3){
                              val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
                              val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V3).asInstanceOf[String]
                              val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
                              // not supported in 0.8.2
                              // val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
                              OffsetAndMetadata(offset, metadata, commitTimestamp)
                            } else {
                              throw new IllegalStateException("Unknown offset message version: " + structAndVersion.version)
                            }
                          }
                        }
                        private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
                          if(buffer == null) { // tombstone
                            MessageValueStructAndVersion(null, -1)
                          } else {
                            val version = buffer.getShort()
                            val valueSchema = schemaFor(version).valueSchema
                            val value = valueSchema.read(buffer).asInstanceOf[Struct]
                            MessageValueStructAndVersion(value, version)
                          }
                        }
                      }
                      

                        请问大佬,消费者端会维护一个offset吗?

                        • 我前面是说业务处理成功了,提交那步出了问题,这就没办法了,然后服务端不会更新offset,如果本地也有维护备份offset的话,下次消费就可以对比获取到的消息中的offset,选择处理未消费部分

                            • 好吧,我们生产用了4年,先提交offset,然后处理业务,如果业务重的,我就降低了并发,提交offset的数量控制了,保证即使项目崩溃也丢不了几条消息(项目直接崩溃4年来没遇到过,也就是说从没丢过消息),只要是正常的关闭,都不会有问题。

                                • 嗯嗯,我前面想的也有问题,如果消费者重试提交有问题的话,消费组应该会把这个消费端抛弃掉再平衡,大佬,你们有把卡夫卡用在涉及到资金的交易里面吗,比如支付类的,感觉这种交易错一次都会被投诉…

                                    kafka 1.1.0 启动之后,打开页面,菜单能出来,页面没内容,点那个菜单都没反应,后台没报错,什么情况

                                    你好,我的kafka版本 kafka_2.12-1.1.0 ,按照你的流程部署,点击:

                                    Kafka Cluster Visualization
                                    Loading ...一直这样(我昨天做的SASL认证,是不是需要额外处理什么)
                                    topic list 我点击了是正常的能看到所有的 topic
                                    其他的点击没啥反应不显示东西
                                    另外我看了下日志报了 空指针异常:

                                    INFO  ConsumerFetcherManager:68 - [ConsumerFetcherManager-1541677023239] Added fetcher for partitions ArrayBuffer()
                                    2018-11-08 19:38:51 WARN  ConsumerFetcherManager$LeaderFinderThread:89 - [KafkaOffsetMonitor-1541677023090_server1.hzguode.com-1541677023185-3dacb5b0-leader-finder-thread], Failed to find leader for Set([__consumer_offsets,16], [__consumer_offsets,2], [__consumer_offsets,39], [__consumer_offsets,15], [__consumer_offsets,33], [__consumer_offsets,38], [__consumer_offsets,9], [__consumer_offsets,7], [__consumer_offsets,31], [__consumer_offsets,48], [__consumer_offsets,24], [__consumer_offsets,19], [__consumer_offsets,11], [__consumer_offsets,22], [__consumer_offsets,26], [__consumer_offsets,17], [__consumer_offsets,10], [__consumer_offsets,35], [__consumer_offsets,36], [__consumer_offsets,34], [__consumer_offsets,49], [__consumer_offsets,44], [__consumer_offsets,23], [__consumer_offsets,40], [__consumer_offsets,47], [__consumer_offsets,45], [__consumer_offsets,43], [__consumer_offsets,46], [__consumer_offsets,37], [__consumer_offsets,28], [__consumer_offsets,27], [__consumer_offsets,25], [__consumer_offsets,6], [__consumer_offsets,5], [__consumer_offsets,3], [__consumer_offsets,18], [__consumer_offsets,14], [__consumer_offsets,41], [__consumer_offsets,42], [__consumer_offsets,4], [__consumer_offsets,12], [__consumer_offsets,21], [__consumer_offsets,30], [__consumer_offsets,1], [__consumer_offsets,32], [__consumer_offsets,13], [__consumer_offsets,8], [__consumer_offsets,20], [__consumer_offsets,0], [__consumer_offsets,29])
                                    java.lang.NullPointerException
                                            at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
                                            at kafka.cluster.Broker.connectionString(Broker.scala:62)
                                            at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
                                            at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
                                            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
                                            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
                                            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                                            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
                                            at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
                                            at scala.collection.AbstractTraversable.map(Traversable.scala:105)
                                            at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
                                            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
                                            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
                                    

                                    麻烦大神抽空帮忙给看下 搞了一上午 头大....

                                    进入某个topic后Unable to find Active Consumers,其实有个消费端,测试好多条发送消息和接受消息,在zk上也有consumer信息。
                                    在kafka-manager上能看得到,kafkaMonitor上没有,这是怎么回事呀