kafka安装和启动

原创
半兽人 发表于: 2015-02-07   最后更新时间: 2021-04-22 11:04:38  
{{totalSubscript}} 订阅,229993 游览

kafka安装和启动

kafka的背景知识已经讲了很多了,让我们现在开始实践吧,假设你现在没有KafkaZooKeeper环境。

Step 1: 下载代码

下载并且解压它。

> tar -xzf kafka_2.13-2.8.0.tgz
> cd kafka_2.13-2.8.0

Step 2: 启动服务

注意:你的本地环境必须安装有Java 8+。

运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper。

# 注意:很快,Apache Kafka将不再需要ZooKeeper。
> bin/zookeeper-server-start.sh config/zookeeper.properties
...

打开另一个命令终端启动kafka服务:

> bin/kafka-server-start.sh config/server.properties &

一旦所有服务成功启动,那Kafka已经可以使用了。

Step 3: 创建一个主题(topic)

创建一个名为“test”的Topic,只有一个分区和一个备份:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

创建好之后,可以通过运行以下命令,查看已创建的topic信息:

> bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic。

Step 4: 发送消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。

运行 producer(生产者),然后在控制台输入几条消息到服务器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Step 5: 消费消息

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来,新打开一个命令控制台,输入:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。

可以使用Ctrl-C停止消费者客户端。

kafka集群搭建

Step 6: 使用 Kafka Connect 来 导入/导出 数据

你可能在现有的系统中拥有大量的数据,如关系型数据库传统的消息传递系统,以及许多已经使用这些系统的应用程序。Kafka Connect允许你不断地从外部系统提取数据到Kafka,反之亦然。用Kafka整合现有的系统是非常容易的。为了使这个过程更加容易,有数百个这样的连接器现成可用。

查看Kafka Connect部分,了解更多关于如何将你的数据持续导入和导出Kafka。

Step 7: 使用Kafka Stream来处理数据

一旦你的数据存储在Kafka中,你就可以用Kafka Streams客户端库来处理这些数据,该库适用于Java/Scala。它允许你实现自己的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka主题中。Kafka Streams将在客户端编写和部署标准Java和Scala应用程序的简单性与Kafka服务器端集群技术的优势相结合,使这些应用程序具有可扩展性弹性容错性分布式。该库支持精确的一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。

为了给你一个初步的体验,这里是如何实现流行的WordCount算法的:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

Kafka流演示例子和应用开发教程展示了如何从头到尾编码和运行这样一个流式应用。

Step 8:停止Kafka

现在你已经达到了快速入门的终点,可以随时卸载Kafka环境,或者继续玩下去。

  1. 使用Ctrl-C停止生产者和消费者客户端。
  2. 使用Ctrl-C停止Kafka broker。
  3. 最后,用Ctrl-C停止ZooKeeper。

如果你还想删除你的本地Kafka环境的数据,包括你创建的消息,运行命令。

$ rm -rf /tmp/kafka-logs /tmp/zookeeper


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容


上一条: Kafka的使用场景
下一条: kafka的生态系统

唯一love 25天前

此处的wordcount输入主题不对吧,看流那一节的演示案例,没有问题

半兽人 -> 唯一love 24天前

感谢提醒,晚会我们进行核实。

半兽人 -> 唯一love 19天前

已经将文章重新整理了,迁移到流那一节了。

不二の 2月前

大佬,replicas 和ISR中有-1是表示有节点挂了吗

不二の -> 半兽人 2月前

那样的话会不会造成数据丢失

半兽人 -> 不二の 2月前

如果分区的副本都是-1了,就会丢消息。
https://www.orchome.com/6#item-6

不二の -> 半兽人 2月前

明白了,谢谢大佬

最新更新2021年12月的么,我是穿越了吗

取得是系统时间,程序有bug!!!

我又更新了一下,是正确的时间....

4月前

您好,输入命令

>bin\windows\connect-standalone.bat config\connect-standalone
.properties config\connect-file-source.properties config\connext-file-sink.properties

后提示 ERROR Stopping after connector error

应该怎么处理呀?

我在上步操作的时候多打了一个 > echo foo>test.txt 因为没有加空格,也没反应我以为是空格问题,所以就加空格重新输入了一遍

半兽人 -> 4月前
cmd .\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-bbdd.properties
陈什什什 5月前

你好,bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testof1 --from-beginning命令输入完,什么反应都没有,这是咋回事

半兽人 -> 陈什什什 5月前

发送消息的时候,确认成功了吗?

陈什什什 -> 半兽人 5月前

是的,使用java的消费者都可以接受到消息,在服务器上面执行这条命令,像是啥也不输入,只按了回车键一样

半兽人 -> 陈什什什 5月前

应该最后会报超时,java既然成功了,应该配置的不是localhost:9092这个,换成跟java一样具体的ip试试。

陈什什什 -> 半兽人 5月前
[root@localhost kafka_2.12-2.0.1]# bin/kafka-console-consumer.sh --bootstrap-server *****:9092 --topic testof1 --from-beginning
[root@localhost kafka_2.12-2.0.1]#

这是命令执行之后的,之前正常的时候应该像这篇文章中说到的那样,我这里就直接没反应

半兽人 -> 陈什什什 5月前

如果没有任何数据 也没有报错 那这个topic里没有任何消息。

西木 5月前

我按照步骤创建完三个kafka节点,使用bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic 查看创建情况,结果执行后没有什么反馈信息,请问是怎么回事?

半兽人 -> 西木 5月前

用这个试试

bin/kafka-topics.sh --describe --topic my-replicated-topic --bootstrap-server localhost:9092
西木 -> 半兽人 5月前

谢谢,可以了,我不知道原因出在哪里,索性就重装了虚拟机,再重新操作后可以了。

半兽人 -> 西木 5月前

嗯,有问题在提问吧

小夕夕 9月前

listeners 这个参数应该怎么配置,是默认为空还是ip地址?

半兽人 -> 小夕夕 9月前

本机内访问可以写空,如果其他机器需要访问的话,写内网ip。

红色彗星 9月前

WordCountDemo例子有人看懂的吗?虽然思想很简单,但是感觉没有main函数是不是感觉不完全呢

红色彗星 9月前

你好,输入ps | grep server-1.properties没有返回进程信息,如何解决呢?前面启动的时候已经提示 INFO [KafkaServer id=1] started (kafka.server.KafkaServer)了

半兽人 -> 红色彗星 9月前

你是开多个命令窗口吗。

红色彗星 9月前

学习的时候不能安装kafka单机版吗?感觉跑不起三个虚拟机

半兽人 -> 红色彗星 9月前

这个例子就是在一台机器上跑集群的呀。

红色彗星 -> 半兽人 9月前

哦哦。我还以为要几个虚拟机呢emmm抱歉,应该是我之前搜索到的网上的安装教程都是装三台虚拟机的小集群,我才想到这个的。

提问