kafka命令大全

半兽人 发表于: 2016-10-25   最后更新时间: 2019-12-01  

整理kafka相关的常用命令

管理

## 创建主题(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test

查询

## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper 

## topic列表查询
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

## topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

## 新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

## 新消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test

## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

发送和消费

## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

## 消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

## 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

## 新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10

平衡leader

bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

kafka自带压测命令

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092

增加副本

  1. 创建规则json

    cat > increase-replication-factor.json <<EOF
    {"version":1, "partitions":[
    {"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
    {"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
    }
    EOF
    
  2. 执行

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
    
  3. 验证

    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
    


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka源码在idea运行并断点调试
下一条: 获取kafka版本

  • 博主,您好:
    想请教个kafka副本扩容问题:(2个broker,2个分区,1个副本)
    今天创建了一个topic,指定了2个分区,1个副本,后来想把副 本修改为2个,按照操作步骤执行:
    1、创建json文件

    {
    "partitions":
    [
    {
    "topic":"queue",
    "partition": 0,
    "replicas": [1,2]
    },
    {
    "topic": "queue",
    "partition": 1,
    "replicas": [2,1]
    }
    ],
    "version":1
    }
    

    2、./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test.json --execute
    3、./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file test.json --verify
    以上操作,终端都没提示错误,都正常执行
    但是当我停掉其中一台kafka后,消费者就消费不到数据了,这种情况我不知道如何排查了,希望博主能帮我看看

    • 您的意思是保存偏移量的这个topic也要2副本,是吧? 我刚刚看了下,我的两个broker里,一个保存的是奇数的分区,一个保存的是偶数的分区。
      那我现在要对偏移量进行增加副本操作了?

        还有大大 问一下 我用java写producer的时候 比如有多个topic 但是只有其中一个topic能用 其他都会报错并且会在linux下产生多个sonsumer-console进程 然后consumer 直接不能用

        ./kafka-consumer-groups.sh --list --new-consumer --bootstrap-server localhost:9092
        Exception in thread "main" joptsimple.UnrecognizedOptionException: new-consumer is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
        at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
        at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
        at joptsimple.OptionParser.parse(OptionParser.java:396)
        at kafka.admin.ConsumerGroupCommand$ConsumerGroupCommandOptions.(ConsumerGroupCommand.scala:725)
        at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:42)
        at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)

         bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
        执行此命令,有报错:new-consumer is not a recognized option

        没遇到过这个情况

        您好,有没有用Java API创建,查找topic的文章?

        • 做的是一个先查询,如果没有这个topic,则去创建topic。
          我用网上查找的TopicCommand.main(options);报错运行不出来。。。

            我这kafka内存占用率下不来,什么情况,消息也过了失效的时间。

            • 咦,我记得回答过你的问题。
              kafka是基于jvm的,充分利用当前内存,当有别的进程启动的时候,会释放这部分内存。

                .sh在命令台运行不了啊,不是只能运行.bat文件吗

                请教一下,我使用的是2.11版本的kafka,在Susue系统上使用命令创建生产者和消费者进行测试,我都是直接ctrl+C直接退出生产者/消费者的窗口,但ps进程的时候发现生产者和消费者的进程是依然存在的...而且ps -ef|grep java | grep kafka 出来的内容没有办法区分这个进程是生产还是消费者,甚至是否为broker也无法区分..有什么办法可以不使用生产/消费者的时候就直接kill 进程呢?
                并且 kafka-server-stop.sh 以及zookeeper-server-stop.sh 脚本并没有什么用处......

                • jps倒是可以..但是应该也不能区分到底是哪个进程吧。zookeeper可以区分,但是broker和consumer,producer貌似没法区分~~~~感谢你的所有文章以及回复。

                    我的mac运行消费者命令时候一直显示kafka-console-producer.sh:commend not found,请问是咋回事,拜谢!

                    我想在shell里面调kafka的rest api,不知道是否可行?