kafka实战SASL/SCRAM

半兽人 发表于: 2019-10-23   最后更新时间: 2019-10-23 16:54:44  
{{totalSubscript}} 订阅,5768 游览

创建证书

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

验证证书

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name admin

more /etc/kafka/kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin";

    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};

more /etc/kafka/kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="alice"
    password="alice-secret";
};

consumer.properties 和 producer.properties

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256

ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234

启动zk

export KAFKA_OPTS=''
bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'
bin/kafka-server-start.sh config/server.properties

启动生产者和消费者

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties 

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties

本例说明文档来自

kafka使用SASL/SCRAM认证



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容


上一条: kafka实战SASL/PLAIN认证
下一条: Kafka Stream WordCountDemo.java

人生如梦! 1月前

您好,配置权限后,第一次连接要50多秒,这个有什么办法优化一下吗?

tiiimo 2月前

你好加上权限之后,部分.sh不能用了,比如:kafka-consumer-groups.sh。 求教程!

半兽人 -> tiiimo 2月前

大兄弟,你漏命令呀。

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
tiiimo -> 半兽人 2月前

你好 这个 kafka_client_jaas.conf 是 消费者的conf吗,我加上还是有问题

[root@cdh_app_server02 kafka_2.12-2.2.0]# KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka_2.12-2.2.0/config/cjfconsumerzhs.conf" bin/kafka-consumer-groups.sh --bootstrap-server 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --list
Error: Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:131)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:57)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.apache.kafka.clients.admin.KafkaAdminClient$22.handleFailure(KafkaAdminClient.java:2615)
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:620)
    at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:736)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutPendingCalls(KafkaAdminClient.java:804)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1098)
    at java.lang.Thread.run(Thread.java:748)
半兽人 -> tiiimo 2月前
Failed to find brokers to send ListGroups

你试试只写一个broker

bin/kafka-consumer-groups.sh --bootstrap-server 172.19.27.16:9092 --list
tiiimo -> 半兽人 2月前

还是报相同的错

tiiimo -> 半兽人 2月前

可以加您个微信,或者qq吗

半兽人 -> tiiimo 2月前

你kafka的端口是不是已经不是9092了...地址错了

tiiimo -> 半兽人 2月前

我能发送消息和监听到消息,

[root@host-192-168-51-9 kafka_2.12-2.2.0]# bin/kafka-console-producer.sh --broker-list 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --topic cjfzhs --producer.config /opt/kafka/kafka_2.12-2.2.0/config/cjfproducerzhs.conf

1
2
3
[root@esb_mysql_slave kafka_2.12-2.2.0]# bin/kafka-console-consumer.sh --bootstrap-server 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --topic cjfzhs --from-beginning --consumer.config /opt/kafka/kafka_2.12-2.2.0/config/cjfconsumerzhs.conf --group cjfGroupzhs

1
3
2

我这个是加了 scram认证~~

半兽人 -> tiiimo 2月前

看看你server.properties里

listeners=

的配置

tiiimo -> 半兽人 2月前
sasl.enabled.mechanisms=SCRAM-SHA-512
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
listener.security.protocol.map=INSIDE:SASL_PLAINTEXT,OUTSIDE:PLAINTEXT
inter.broker.listener.name=INSIDE
listeners=INSIDE://:9092,OUTSIDE://:8088
advertised.listeners=INSIDE://172.19.27.16:9092,OUTSIDE://SjwESB1:8088
半兽人 -> tiiimo 2月前

你这个配置,真的是乱那。。
哪里看的教程。。

tiiimo -> 半兽人 2月前

不是我搞得,明天我重新搭一下 我也看不下去了

风起云涌 9月前

想问下,我最近在学习kafka2.5.0配置sasl/scram,看了文档有一些疑问:
1、kafka_server_jaas.conf文件里配置了org.apache.kafka.common.security.plain.PlainLoginModule required,是必须的吗?还是写错了
2、sasl/scram整个配置中,需要对zookeeper进行配置吗?我看文档里没有,如果需要,要如何配置?
3、kafka配置了sasl/scram,工具kafka tool,kafka eagle等,如何连接集群进行管理?

tiiimo -> 风起云涌 2月前

大佬 有什么方法吗,各种.sh 都报错

风起云涌 -> tiiimo 2月前

怎么运行?怎么样的报错呢?是不是安装有问题啊?

tiiimo -> 风起云涌 2月前

就是加上了 SCRAM权限,就报这样的错误

[root@cdh_app_server02 kafka_2.12-2.2.0]# bin/kafka-consumer-groups.sh --bootstrap-server 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --describe --group cjfGroupzhs
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:331)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:251)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

能加您个微信 或者qq吗

风起云涌 -> tiiimo 2月前

微信bisaluo_tao

提问