kafka Raft模式安装和启动(去zk)

半兽人 发表于: 2021-10-01   最后更新时间: 2022-09-22 00:36:29  
{{totalSubscript}} 订阅, 6,093 游览

kafka Raft安装和启动(单节点)

从kafka2.8版本之后,Raft模式已经正式使用了,无需安装zookeeper,本节带大家抢险体验。

Step 1: 下载代码

下载并且解压它。

$ tar -xzf kafka_2.13-3.2.3.tgz
$ cd kafka_2.13-3.2.3

Step 2: 生成集群id,并格式化存储目录

生成集群ID

$ bin/kafka-storage.sh random-uuid

Cba3BkapTFWyEKbpUkNJ_w #获取返回的集群ID

格式化存储目录

$ bin/kafka-storage.sh format -t Cba3BkapTFWyEKbpUkNJ_w -c config/kraft/server.properties

输出:

Formatting /tmp/kraft-combined-logs

如果是多节点安装,确保每个节点使用的集群ID是相同的。

Step 3: 启动服务

注意:你的本地环境必须安装有Java 9+。

$ bin/kafka-server-start.sh config/kraft/server.properties &

一旦成功启动,那Kafka已经可以使用了。

Step 4: 创建一个主题(topic)

创建一个名为“test”的Topic,只有一个分区和一个备份:

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

创建好之后,可以通过运行以下命令,查看已创建的topic信息:

$ bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic,点击这里查看如何配置自动创建topic时设置默认的分区和副本数

Step 5: 发送消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。

运行 producer(生产者),然后在控制台输入几条消息到服务器。

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Step 6: 消费消息

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来,新打开一个命令控制台,输入:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties
This is a message
This is another message

如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。

是的,整个过程没有用到zookeeper。

相关

更新于 2022-09-22
在线,1小时前登录

亞火削風 7月前

找了一圈, 没有发现什么好用的 支持 KRaft (zk-less) 的 Kafka GUI 工具, https://github.com/provectus/kafka-ui 这貌似支持, 在尝试

玉言 1年前

大佬能帮看下是什么原因么,win10环境下,raft模式单节点启动报错,提示quorum-state另一个程序正在使用此文件,进程无法访问,查看生成目录下并无这个文件,我手动加了个空文件,被进程自己删掉了,错误日志:

[2022-08-10 18:10:22,760] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.io.UncheckedIOException: Error while writing the Quorum status from the file D:\tmp\kraft-combined-logs\__cluster_metadata-0\quorum-state
        at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155)
        at org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128)
        at org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477)
        at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212)
        at org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:364)
        at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:203)
        at kafka.raft.KafkaRaftManager.<init>(RaftManager.scala:125)
        at kafka.server.KafkaRaftServer.<init>(KafkaRaftServer.scala:76)
        at kafka.Kafka$.buildServer(Kafka.scala:79)
        at kafka.Kafka$.main(Kafka.scala:87)
        at kafka.Kafka.main(Kafka.scala)
Caused by: java.nio.file.FileSystemException: D:\tmp\kraft-combined-logs\__cluster_metadata-0\quorum-state.tmp -> D:\tmp\kraft-combined-logs\__cluster_metadata-0\quorum-state: 另一个程序正在使用此文件,进程无法访问。

        at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
        at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
        at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
        at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
        at java.base/java.nio.file.Files.move(Files.java:1421)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918)
        at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
        ... 10 more
        Suppressed: java.nio.file.FileSystemException: D:\tmp\kraft-combined-logs\__cluster_metadata-0\quorum-state.tmp -> D:\tmp\kraft-combined-logs\__cluster_metadata-0\quorum-state: 另一个程序正在使用此文件,进程无法访问。

                at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
                at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
                at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
                at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
                at java.base/java.nio.file.Files.move(Files.java:1421)
                at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
                ... 12 more
半兽人 -> 玉言 1年前

这个错误是kafka进程占用了文件,又起了新的就会报,之前的没死掉了,文件被占用。

月舞花弄影 -> 玉言 1年前

请问这个问题怎么解决的

2个方案:
1、找到这个进程kill掉。
2、重启

我是windows环境,这个错误是在启动的时候(kafka没有启动成功),之前没有启动过kafka。与jdk版本有吗? 我是用的1.8。
另外按您的2个方案试了试也不行。没有找到相关的kafka进程;重启试了一下也不行

如果是这样,你去把占用的文件删除掉。
启动kafka进程的权限问题,我记得window有以管理员的身份运行。

恩恩,谢谢。我换虚机了,用linux可以。window有可能与权限有关,抽空我试试。
另外再请教您一个问题:现在我对接sql-connector,但一直报【Failed to find any class that implements Connector and which name matches io.debezium.connector.sqlserver.SqlServerConnector】插件配置了用的是【debezium-connector-sqlserver-2.2.1.Final-plugin.tar.gz】的,启动的时候也有看到日志插件加载了,但配置连接器的时候一直报500,没有找到那个连接器的实现

你到问题专区提问吧,这里只是简单对答,需要带上你的操作步骤,不然我不知道上下文,无法帮助你。

可以连接了,是版本问题,对应的版本和配置的类型包不匹配导致的。谢谢您

我也是win10遇到同样的问题,用的kafka_2.13-3.3.2.tgz,请问下是什么版本不对啊。

我换centos了。
你用管理员启动一下试试

谢谢!多节点和管理员启动都试过不行,可能windows还是暂时用不了

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章