7.4 认证和acl
kafka附带一个可插拔的ACL(Access Control List 访问控制列表)
,它使用zookeeper来存储。通过在server.properties
中设置authorizer.class.name来启用:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
Kafka acls的格式为 "Principal P is [Allowed/Denied] Operation O From Host H On Resource R”
,你可以使用Kafka authorizer CLI 来添加,删除或查询所有acl。默认情况下,如果ResourcePatterns与特定的资源R没有匹配,则除了超级用户之外,都不允许访问R。如果要更改该行为,可以在server.properties
中包含以下内容。
allow.everyone.if.no.acl.found=true
你也可以在server.properties
添加超级用户,像这样(注意分隔符是分号,因为SSL的用户名是逗号)。
super.users=User:Bob;User:Alice
默认情况下,SSL用户名的格式为“CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”。可以通过在server.properties中设置自定义的PrincipalBuilder来改变它,如下所示:
principal.builder.class=CustomizedPrincipalBuilderClass
可以通过修改server.properties
中的sasl.kerberos.principal.to.local.rules自定义规则。sasl.kerberos.principal.to.local.rules的格式是一个列表,其中每个规则的工作方式与Kerberos 配置文件 (krb5.conf)中的auth_to_local相同。 也支持小写规则,可通过在规则的末尾添加“/L”,强制转移全部结果为小写。每个规则都以RULE
开头:并包含一个表达式,格式如下。 有关更多详细信息,请参阅kerberos文档。
RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L
举个例子,添加规则,将user@MYDOMAIN.COM
转换为用户
,同时保持默认规则,示例如下:
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
命令行界面
Kafka认证管理CLI(和其他的CLI脚本)可以在bin目录中找到。CLI脚本名是kafka-acls.sh
。以下列出了所有脚本支持的选项:
选项 | 描述 | 默认 | 类型选择 |
---|---|---|---|
--add | 添加一个acl | Action | |
--remove | 移除一个acl | Action | |
--list | 列出acl | Action | |
--authorizer | authorizer的完全限定类名 | kafka.security.auth.SimpleAclAuthorizer | Configuration |
--authorizer-properties | key=val,传给authorizer进行初始化,例如:zookeeper.connect=localhost:2181 | Configuration | |
--cluster | 指定集群作为资源。 | Resource | |
--topic [topic-name] | 指定topic作为资源。 | Resource | |
--group [group-name] | 指定 consumer-group 作为资源。 | Resource | |
-allow-principal | 添加到允许访问的ACL中,Principal是PrincipalType:name格式。 你可以指定多个。 |
Principal | |
--deny-principal | 添加到拒绝访问的ACL中,Principal是PrincipalType:name格式。 你可以指定多个。 |
Principal | |
--allow-host | --allow-principal中的principal的IP地址允许访问。 | 如果--allow-principal指定的默认值是* ,则意味着指定“所有主机” |
Host |
--deny-host | 允许或拒绝的操作。 有效值为:读,写,创建,删除,更改,描述,ClusterAction,全部 |
ALL | Operation |
--operation | --deny-principal中的principals的IP地址拒绝访问。 | 如果 --deny-principal指定的默认值是 * 则意味着指定 "所有主机" | Host |
--producer | 为producer角色添加/删除acl。生成acl,允许在topic上WRITE, DESCRIBE和CREATE集群。 | Convenience | |
--consumer | 为consumer role添加/删除acl,生成acl,允许在topic上READ, DESCRIBE 和 consumer-group上READ。 | Convenience | |
--force | 假设所有操作都是yes,规避提示 | Convenience |
例子
添加acl
假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
需要注意的是
--allow-host
和deny-host
仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]
作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。删除acl
删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
acl列表
我们可以通过--list
选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
添加或删除作为生产者或消费者的principal
acl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。
您好!我报了以下错,是因为zookeeper少配置了什么吗?WARN SASL
configuration failed. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/profiles.d/ssl/kafka/kafka_server_jaas.conf'. at org.apache.zookeeper.client.ZooKeeperSaslClient.<init>(ZooKeeperSaslClient.java:189) at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1161) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1211)
不加acl配置时可以正常起,加了authorizer.class.name=kafka.security.authorizer.AclAuthorizer后报该错
错误很明显吧:
实际上我和这位的错误一样:https://www.orchome.com/6808 。加了Client之后报了同样的错误。我是根据这里来配置的:https://www.orchome.com/1966 。不知道这里是不是少了zookeeper相关的配置。
kafka 自带zookeeper集群如何添加plain 认证
在
config/zookeeper.properties
中。config/zookeeper.properties 中添加了如下配置:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
定义了 /opt/kafka/config/zk_server_jaas.conf
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-123" user_kafka="kafka-1234" user_producer="1234567"; };
zookeeper-server-start.sh 启动脚本改成了:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka/config/zk_server_jaas.conf org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
但是实际配置下来就是配置不生效,ZK 不需要密码仍然可以访问
Kafka 2.4版本之后kafka.security.auth.SimpleAclAuthorizer 改为:kafka.security.authorizer.AclAuthorizer
感谢提醒,稍后我来更新一下文章。
这个授权好像没有设置用户是否有权限创建topic?如果要设置用户是否有权限创建topic应该怎么设置呢?
只有超级用户,才有权限创建。
您好,前一段在您的指导下解决了kafka认证与程序消费问题,这两天想做一个kafka压力测试, 使用的方法是:
./kafka-producer-perf-test.sh --num-records 50000000 --record-size 3000 --throughput 100000 --topic test-rep-one --producer-props bootstrap.servers=server1.hzguode.com:9092,server2.hzguode.com:9092,server3.hzguode.com:9092 --producer.config /hadoop/app/kafka/config/producer.properties
现在遇到一个问题:不加 --producer.config会出现之前的disconnect错误,加上之后出现:
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
这个错误明明之前解决了,是不是测试命令不是这么加的?希望您能抽时间帮我看一下,谢谢了!
您好,一直在您这里学习,今天遇到了一个问题,
[zk: localhost:2181(CONNECTED) 0] rmr /brokers/topics/test5 Authentication is not valid : /brokers/topics/test5/partitions
test5 topic 删除的时候报这个错误,之前zookeeper和kafka做过SASL认证,能帮我看下是什么原因么?谢谢
那你zk也要认证登录呀
认证登录 是在命令上面修改么?能具体点么 有点懵....
1、可以采用连接方式:
zookeeper-client/bin/zkCli.sh -server `hostname -f`:2181
2、执行 zkCli.sh之前先执行(可添加到zkCli.sh最前面):
export JVMFLAGS="-Djava.security.auth.login.config=/etc/zookeeper/conf/zookeeper_jaas.conf"(个人采取方式,加入了/etc/profile环境变量)
3、先执行
source /etc/zookeeper/conf/zookeeper-env.sh
,再执行/usr/hdp/current/zookeeper-client/bin/zkCli.sh
好的 非常感谢 我去操作一下
一样的额,你要加认证的。
嗯嗯 谢谢已经解决了 但是有个新问题了, 就是我用
./kafka-producer-perf-test.sh --num-records 2000000 --record-size 3000 --throughput 100000 --topic test-rep-one --producer-props
这个命令测试时候200W数据没问题,上升到300W时候就报错了:
org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-rep-one-5: 30942 ms has passed since last append org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-rep-one-2: 30021 ms has passed since batch creation plus linger time
能不能指个大概方向,网上说的listener改ip,加大request时间都改了貌似不起作用。。。。头晕
过了30秒了,还没未发出。看这里:
https://www.orchome.com/511
搜request关键字。
非常感谢
大神,在javaj客户端配置了
props.put("security.protocol","SASL_PLAINTEXT"); props.put("sasl.mechanism","PLAIN");
报
Caused by: javax.security.auth.login.LoginException: 无法找到 LoginModule 类: org.apache.kafka.common.security.plain.PlainLoginModule
的错
大神,那个控制某个ip某个用户,这个用户假如在java端应该怎么配置
大神,我想问一下这里的用户可以和数据库表里的用户关联起来吗,如果不能的话,这里的用户有什么意义呢,靠什么区分呢
可不可以设置客户端Java KafkaConsumer的权限?
可以呀,acl就是控制用户的读写权限的。
就是我的java端(10.29.28.207),一个xshell端(10.29.180.131),怎么在131端控制java那边consumer的权限和producer的权限、
User:* has Deny permission for operations: Read from hosts: 10.29.28.207 User:* has Deny permission for operations: Write from hosts: 10.29.28.207 User:* has Allow permission for operations: Read from hosts: 10.29.180.131 User:* has Allow permission for operations: Write from hosts: 10.29.180.131
这个是远程的
String connectionString = SystemConfigProperties.getProperty("kafka.bootstrap.servers", "127.0.0.1:9092"); Properties props = new Properties(); props.put("bootstrap.servers", connectionString); props.put("group.id", groupid); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", SystemConfigProperties.getProperty("kafka.auto.commit.interval.ms", "1000")); props.put("session.timeout.ms", SystemConfigProperties.getProperty("kafka.session.timeout.ms", "30000")); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String type="earliest"; if(type.equals("earliest")){ props.put("auto.offset.reset", "earliest"); }
这个是java的,发现还是没有效果