kafka实战SASL/SCRAM

半兽人 发表于: 2019-10-23   最后更新时间: 2019-10-23 16:54:44  
{{totalSubscript}} 订阅, 12,580 游览

创建证书

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

验证证书

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name admin

more /etc/kafka/kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin";

    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};

more /etc/kafka/kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="alice"
    password="alice-secret";
};

consumer.properties 和 producer.properties

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256

ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234

启动zk

export KAFKA_OPTS=''
bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'
bin/kafka-server-start.sh config/server.properties

启动生产者和消费者

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties 

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties

本例说明文档来自

kafka使用SASL/SCRAM认证

更新于 2019-10-23
在线,4小时前登录

呼哈 9月前

大佬好! 请问笔记目录下有SASL的各种实战例子,那我们有没有ACL的实战例子呢?
请教一下如果已经给某topic配置了ACL,想要启动该topic的消费者该在基础命令上加什么呢? 感谢感谢!!

半兽人 -> 呼哈 9月前

这篇文章应该对你有帮助:kafka认证和acl

呼哈 -> 半兽人 9月前

是的, 我有学习这篇文章! 这边文章更多的是给topic配置acl权限。我想请教您的是配置了acl的topic启动消费者的命令是什么呢?bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 这样的话当然是链接不上的。该怎么更改这个命令呢? 感谢!!

半兽人 -> 呼哈 9月前

kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="alice"
    password="alice-secret";
};

运行:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties 

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties
1年前

你好,我是使用docker进行的搭建,kafka运行不起来,报如下错误,求大佬指点迷津

ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.SecurityException: zookeeper.set.acl is true, but ZooKeeper client TLS configuration identifying at least kafka.server.KafkaConfig$@5158b42f.ZkSslClientEnableProp, kafka.server.KafkaConfig$@5158b42f.ZkClientCnxnSocketProp, and kafka.server.KafkaConfig$@5158b42f.ZkSslKeyStoreLocationProp was not present and the verification of the JAAS login file failed [java.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf, zookeeper.sasl.client=default:true, zookeeper.sasl.clientconfig=default:Client]
        at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:445)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:191)
        at kafka.Kafka$.main(Kafka.scala:109)
        at kafka.Kafka.main(Kafka.scala)
半兽人 -> 1年前

你加了验证,但是配置里没找到相关的配置。

南~风 1年前

大佬,kafka_2.11-2.1.1版本

jaas.conf

KafkaServer {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="admin@123"
 user_admin="admin@123"
 user_producer="producer@123"
 user_consumer="consumer@123";
};
KafkaClient{
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="kafka"
 password="kafka@123";
};
ZookeeperClient{
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="kafka"
 password="kafka@123";
};
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/root/upgrep/kafka_2.11-2.1.1/config/jaas.conf"
fi

报错:

WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/root/upgrep/kafka_2.11-2.1.1/config/jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
ERROR [ZooKeeperClient] Auth failed. (kafka.zookeeper.ZooKeeperClient)

求大佬指点迷津啊,谢谢大佬

半兽人 -> 南~风 1年前

你验证过这些用户都正常吗,例如:

## 验证证书
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name admin
Shine 2年前

大佬,kafka 2.2.0 ,添加了 kafka_server_jaas.conf 文件

KafkaServer {
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="admin"
        compassword="admin-unicom";
};

Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="kafka-unicom";
};

kafka kafka-run-class.sh 也添加了

# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
  KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/kafka_2.12-2.2.0/config/kafka_server_jaas.conf"
fi

还是启动报错

ERROR [Controller id=0, targetBrokerId=0] Connection to node 0 (host088/132.46.109.57:9092) failed authentication due to: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: User name could not be obtained [Caused by javax.security.auth.callback.UnsupportedCallbackException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user.]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state. (org.apache.kafka.clients.NetworkClient)

麻烦大佬帮看一下,谢谢

半兽人 -> Shine 2年前

客户端的版本太低了。

Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user.

翻译过来就是

无法登录:客户端被要求提供密码,但Kafka客户端代码目前不支持从用户那里获得密码。

Shine -> 半兽人 2年前

关键我kafka启动都报错啊,难道是kafka 2.2.0的问题吗?

Lioa- 2年前

ssl会影响kafka不开启zero copy,那除了SASL_PLAINTEXT之外的认证会影响吗

半兽人 -> Lioa- 2年前

信息太少了,到问题专区详细提问吧。

人生如梦! 3年前

您好,配置权限后,第一次连接要50多秒,这个有什么办法优化一下吗?

加cpu..没必要吧,提升运算。

tiiimo 3年前

你好加上权限之后,部分.sh不能用了,比如:kafka-consumer-groups.sh。 求教程!

半兽人 -> tiiimo 3年前

大兄弟,你漏命令呀。

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
tiiimo -> 半兽人 3年前

你好 这个 kafka_client_jaas.conf 是 消费者的conf吗,我加上还是有问题

[root@cdh_app_server02 kafka_2.12-2.2.0]# KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka_2.12-2.2.0/config/cjfconsumerzhs.conf" bin/kafka-consumer-groups.sh --bootstrap-server 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --list
Error: Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:131)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:57)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.apache.kafka.clients.admin.KafkaAdminClient$22.handleFailure(KafkaAdminClient.java:2615)
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:620)
    at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:736)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutPendingCalls(KafkaAdminClient.java:804)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1098)
    at java.lang.Thread.run(Thread.java:748)
半兽人 -> tiiimo 3年前
Failed to find brokers to send ListGroups

你试试只写一个broker

bin/kafka-consumer-groups.sh --bootstrap-server 172.19.27.16:9092 --list
tiiimo -> 半兽人 3年前

还是报相同的错

tiiimo -> 半兽人 3年前

可以加您个微信,或者qq吗

半兽人 -> tiiimo 3年前

你kafka的端口是不是已经不是9092了...地址错了

tiiimo -> 半兽人 3年前

我能发送消息和监听到消息,

[root@host-192-168-51-9 kafka_2.12-2.2.0]# bin/kafka-console-producer.sh --broker-list 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --topic cjfzhs --producer.config /opt/kafka/kafka_2.12-2.2.0/config/cjfproducerzhs.conf

1
2
3
[root@esb_mysql_slave kafka_2.12-2.2.0]# bin/kafka-console-consumer.sh --bootstrap-server 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --topic cjfzhs --from-beginning --consumer.config /opt/kafka/kafka_2.12-2.2.0/config/cjfconsumerzhs.conf --group cjfGroupzhs

1
3
2

我这个是加了 scram认证~~

半兽人 -> tiiimo 3年前

看看你server.properties里

listeners=

的配置

tiiimo -> 半兽人 3年前
sasl.enabled.mechanisms=SCRAM-SHA-512
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
listener.security.protocol.map=INSIDE:SASL_PLAINTEXT,OUTSIDE:PLAINTEXT
inter.broker.listener.name=INSIDE
listeners=INSIDE://:9092,OUTSIDE://:8088
advertised.listeners=INSIDE://172.19.27.16:9092,OUTSIDE://SjwESB1:8088
半兽人 -> tiiimo 3年前

你这个配置,真的是乱那。。
哪里看的教程。。

tiiimo -> 半兽人 3年前

不是我搞得,明天我重新搭一下 我也看不下去了

风起云涌 3年前

想问下,我最近在学习kafka2.5.0配置sasl/scram,看了文档有一些疑问:
1、kafka_server_jaas.conf文件里配置了org.apache.kafka.common.security.plain.PlainLoginModule required,是必须的吗?还是写错了
2、sasl/scram整个配置中,需要对zookeeper进行配置吗?我看文档里没有,如果需要,要如何配置?
3、kafka配置了sasl/scram,工具kafka tool,kafka eagle等,如何连接集群进行管理?

tiiimo -> 风起云涌 3年前

大佬 有什么方法吗,各种.sh 都报错

风起云涌 -> tiiimo 3年前

怎么运行?怎么样的报错呢?是不是安装有问题啊?

tiiimo -> 风起云涌 3年前

就是加上了 SCRAM权限,就报这样的错误

[root@cdh_app_server02 kafka_2.12-2.2.0]# bin/kafka-consumer-groups.sh --bootstrap-server 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --describe --group cjfGroupzhs
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:331)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:251)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

能加您个微信 或者qq吗

风起云涌 -> tiiimo 3年前

微信bisaluo_tao

命令后面增加 --command-config ./sasl.conf
sasl.conf中,增加如下配置:

#### cat sasl.conf #####
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章