创建证书
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
验证证书
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name admin
more /etc/kafka/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret"
user_admin="admin";
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
more /etc/kafka/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="alice"
password="alice-secret";
};
consumer.properties 和 producer.properties
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234
启动zk
export KAFKA_OPTS=''
bin/zookeeper-server-start.sh config/zookeeper.properties
启动kafka
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'
bin/kafka-server-start.sh config/server.properties
启动生产者和消费者
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties
本例说明文档来自
kafka使用SASL/SCRAM认证
大佬 来一个Kraft 启动的 做一个SASL/SCRAM 认证啊
大佬好! 请问笔记目录下有SASL的各种实战例子,那我们有没有ACL的实战例子呢?
请教一下如果已经给某topic配置了ACL,想要启动该topic的消费者该在基础命令上加什么呢? 感谢感谢!!
这篇文章应该对你有帮助:kafka认证和acl
是的, 我有学习这篇文章! 这边文章更多的是给topic配置acl权限。我想请教您的是配置了acl的topic启动消费者的命令是什么呢?bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 这样的话当然是链接不上的。该怎么更改这个命令呢? 感谢!!
kafka_client_jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret"; };
运行:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf" bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config config/producer.properties export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf" bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/consumer.properties
你好,我是使用docker进行的搭建,kafka运行不起来,报如下错误,求大佬指点迷津
ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.SecurityException: zookeeper.set.acl is true, but ZooKeeper client TLS configuration identifying at least kafka.server.KafkaConfig$@5158b42f.ZkSslClientEnableProp, kafka.server.KafkaConfig$@5158b42f.ZkClientCnxnSocketProp, and kafka.server.KafkaConfig$@5158b42f.ZkSslKeyStoreLocationProp was not present and the verification of the JAAS login file failed [java.security.auth.login.config=/opt/kafka/config/kafka_client_jaas.conf, zookeeper.sasl.client=default:true, zookeeper.sasl.clientconfig=default:Client] at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:445) at kafka.server.KafkaServer.startup(KafkaServer.scala:191) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala)
你加了验证,但是配置里没找到相关的配置。
大佬,kafka_2.11-2.1.1版本
jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin@123" user_admin="admin@123" user_producer="producer@123" user_consumer="consumer@123"; }; KafkaClient{ org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka@123"; }; ZookeeperClient{ org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka@123"; }; if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/root/upgrep/kafka_2.11-2.1.1/config/jaas.conf" fi
报错:
求大佬指点迷津啊,谢谢大佬
你验证过这些用户都正常吗,例如:
## 验证证书 bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name alice bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name admin
大佬,kafka 2.2.0 ,添加了 kafka_server_jaas.conf 文件
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" compassword="admin-unicom"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-unicom"; };
kafka kafka-run-class.sh 也添加了
# Generic jvm settings you want to add if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/kafka_2.12-2.2.0/config/kafka_server_jaas.conf" fi
还是启动报错
麻烦大佬帮看一下,谢谢
客户端的版本太低了。
翻译过来就是
关键我kafka启动都报错啊,难道是kafka 2.2.0的问题吗?
ssl会影响kafka不开启zero copy,那除了SASL_PLAINTEXT之外的认证会影响吗
信息太少了,到问题专区详细提问吧。
您好,配置权限后,第一次连接要50多秒,这个有什么办法优化一下吗?
加cpu..没必要吧,提升运算。
你好加上权限之后,部分.sh不能用了,比如:kafka-consumer-groups.sh。 求教程!
大兄弟,你漏命令呀。
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
你好 这个 kafka_client_jaas.conf 是 消费者的conf吗,我加上还是有问题
[root@cdh_app_server02 kafka_2.12-2.2.0]# KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka_2.12-2.2.0/config/cjfconsumerzhs.conf" bin/kafka-consumer-groups.sh --bootstrap-server 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --list Error: Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:131) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:57) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) Caused by: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups at org.apache.kafka.clients.admin.KafkaAdminClient$22.handleFailure(KafkaAdminClient.java:2615) at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:620) at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:736) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutPendingCalls(KafkaAdminClient.java:804) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1098) at java.lang.Thread.run(Thread.java:748)
Failed to find brokers to send ListGroups
你试试只写一个broker
bin/kafka-consumer-groups.sh --bootstrap-server 172.19.27.16:9092 --list
还是报相同的错
可以加您个微信,或者qq吗
你kafka的端口是不是已经不是9092了...地址错了
我能发送消息和监听到消息,
[root@host-192-168-51-9 kafka_2.12-2.2.0]# bin/kafka-console-producer.sh --broker-list 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --topic cjfzhs --producer.config /opt/kafka/kafka_2.12-2.2.0/config/cjfproducerzhs.conf 1 2 3
[root@esb_mysql_slave kafka_2.12-2.2.0]# bin/kafka-console-consumer.sh --bootstrap-server 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --topic cjfzhs --from-beginning --consumer.config /opt/kafka/kafka_2.12-2.2.0/config/cjfconsumerzhs.conf --group cjfGroupzhs 1 3 2
我这个是加了 scram认证~~
看看你server.properties里
listeners=
的配置
sasl.enabled.mechanisms=SCRAM-SHA-512 # 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 listener.security.protocol.map=INSIDE:SASL_PLAINTEXT,OUTSIDE:PLAINTEXT inter.broker.listener.name=INSIDE listeners=INSIDE://:9092,OUTSIDE://:8088 advertised.listeners=INSIDE://172.19.27.16:9092,OUTSIDE://SjwESB1:8088
你这个配置,真的是乱那。。
哪里看的教程。。
不是我搞得,明天我重新搭一下 我也看不下去了
想问下,我最近在学习kafka2.5.0配置sasl/scram,看了文档有一些疑问:
1、kafka_server_jaas.conf文件里配置了org.apache.kafka.common.security.plain.PlainLoginModule required,是必须的吗?还是写错了
2、sasl/scram整个配置中,需要对zookeeper进行配置吗?我看文档里没有,如果需要,要如何配置?
3、kafka配置了sasl/scram,工具kafka tool,kafka eagle等,如何连接集群进行管理?
大佬 有什么方法吗,各种.sh 都报错
怎么运行?怎么样的报错呢?是不是安装有问题啊?
就是加上了 SCRAM权限,就报这样的错误
[root@cdh_app_server02 kafka_2.12-2.2.0]# bin/kafka-consumer-groups.sh --bootstrap-server 172.19.27.16:9092,172.19.27.46:9092,172.19.27.65:9092 --describe --group cjfGroupzhs Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:331) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:251) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
能加您个微信 或者qq吗
微信bisaluo_tao
命令后面增加
--command-config ./sasl.conf
sasl.conf中,增加如下配置:
#### cat sasl.conf ##### security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512