kafka认证和acl

半兽人 发表于: 2016-05-06   最后更新时间: 2018-09-04 23:14:45  
{{totalSubscript}} 订阅, 38,504 游览

7.4 认证和acl

kafka附带一个可插拔的ACL(Access Control List 访问控制列表),它使用zookeeper来存储。通过在server.properties中设置authorizer.class.name来启用:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

Kafka acls的格式为 "Principal P is [Allowed/Denied] Operation O From Host H On Resource R”,你可以使用Kafka authorizer CLI 来添加,删除或查询所有acl。默认情况下,如果ResourcePatterns与特定的资源R没有匹配,则除了超级用户之外,都不允许访问R。如果要更改该行为,可以在server.properties中包含以下内容。

allow.everyone.if.no.acl.found=true

你也可以在server.properties添加超级用户,像这样(注意分隔符是分号,因为SSL的用户名是逗号)。

super.users=User:Bob;User:Alice

默认情况下,SSL用户名的格式为“CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”。可以通过在server.properties中设置自定义的PrincipalBuilder来改变它,如下所示:

principal.builder.class=CustomizedPrincipalBuilderClass

可以通过修改server.properties中的sasl.kerberos.principal.to.local.rules自定义规则。sasl.kerberos.principal.to.local.rules的格式是一个列表,其中每个规则的工作方式与Kerberos 配置文件 (krb5.conf)中的auth_to_local相同。 也支持小写规则,可通过在规则的末尾添加“/L”,强制转移全部结果为小写。每个规则都以RULE开头:并包含一个表达式,格式如下。 有关更多详细信息,请参阅kerberos文档。

RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L

举个例子,添加规则,将user@MYDOMAIN.COM转换为用户,同时保持默认规则,示例如下:

sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

命令行界面

Kafka认证管理CLI(和其他的CLI脚本)可以在bin目录中找到。CLI脚本名是kafka-acls.sh。以下列出了所有脚本支持的选项:

选项 描述 默认 类型选择
--add 添加一个acl Action
--remove 移除一个acl Action
--list 列出acl Action
--authorizer authorizer的完全限定类名 kafka.security.auth.SimpleAclAuthorizer Configuration
--authorizer-properties key=val,传给authorizer进行初始化,例如:zookeeper.connect=localhost:2181 Configuration
--cluster 指定集群作为资源。 Resource
--topic [topic-name] 指定topic作为资源。 Resource
--group [group-name] 指定 consumer-group 作为资源。 Resource
-allow-principal 添加到允许访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--deny-principal 添加到拒绝访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--allow-host --allow-principal中的principal的IP地址允许访问。 如果--allow-principal指定的默认值是*,则意味着指定“所有主机” Host
--deny-host 允许或拒绝的操作。
有效值为:读,写,创建,删除,更改,描述,ClusterAction,全部
ALL Operation
--operation --deny-principal中的principals的IP地址拒绝访问。 如果 --deny-principal指定的默认值是 * 则意味着指定 "所有主机" Host
--producer 为producer角色添加/删除acl。生成acl,允许在topic上WRITE, DESCRIBE和CREATE集群。 Convenience
--consumer 为consumer role添加/删除acl,生成acl,允许在topic上READ, DESCRIBE 和 consumer-group上READ。 Convenience
--force 假设所有操作都是yes,规避提示 Convenience

例子

  • 添加acl
    假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    

    默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
    

    需要注意的是--allow-hostdeny-host仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。

  • 删除acl
    删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    
  • acl列表
    我们可以通过--list选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
    
  • 添加或删除作为生产者或消费者的principal
    acl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
    

    同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
    

    注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。

更新于 2018-09-04

木木&很呆 7月前

kafka 自带zookeeper集群如何添加plain 认证

config/zookeeper.properties中。

config/zookeeper.properties 中添加了如下配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

定义了 /opt/kafka/config/zk_server_jaas.conf

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-123"
        user_kafka="kafka-1234"
        user_producer="1234567";
};

zookeeper-server-start.sh 启动脚本改成了:

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS  -Djava.security.auth.login.config=/opt/kafka/config/zk_server_jaas.conf   org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

但是实际配置下来就是配置不生效,ZK 不需要密码仍然可以访问

陈健飞 2年前

Kafka 2.4版本之后kafka.security.auth.SimpleAclAuthorizer 改为:kafka.security.authorizer.AclAuthorizer

半兽人 -> 陈健飞 2年前

感谢提醒,稍后我来更新一下文章。

Simon 4年前

这个授权好像没有设置用户是否有权限创建topic?如果要设置用户是否有权限创建topic应该怎么设置呢?

半兽人 -> Simon 4年前

只有超级用户,才有权限创建。

您好,前一段在您的指导下解决了kafka认证与程序消费问题,这两天想做一个kafka压力测试, 使用的方法是:

./kafka-producer-perf-test.sh --num-records 50000000 --record-size 3000 --throughput 100000 --topic test-rep-one --producer-props bootstrap.servers=server1.hzguode.com:9092,server2.hzguode.com:9092,server3.hzguode.com:9092 --producer.config /hadoop/app/kafka/config/producer.properties

现在遇到一个问题:不加 --producer.config会出现之前的disconnect错误,加上之后出现:

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

这个错误明明之前解决了,是不是测试命令不是这么加的?希望您能抽时间帮我看一下,谢谢了!

您好,一直在您这里学习,今天遇到了一个问题,

[zk: localhost:2181(CONNECTED) 0] rmr /brokers/topics/test5
Authentication is not valid : /brokers/topics/test5/partitions

test5 topic 删除的时候报这个错误,之前zookeeper和kafka做过SASL认证,能帮我看下是什么原因么?谢谢

那你zk也要认证登录呀

认证登录 是在命令上面修改么?能具体点么 有点懵....

1、可以采用连接方式:

zookeeper-client/bin/zkCli.sh -server `hostname -f`:2181

2、执行 zkCli.sh之前先执行(可添加到zkCli.sh最前面):

export JVMFLAGS="-Djava.security.auth.login.config=/etc/zookeeper/conf/zookeeper_jaas.conf"(个人采取方式,加入了/etc/profile环境变量)

3、先执行source /etc/zookeeper/conf/zookeeper-env.sh,再执行/usr/hdp/current/zookeeper-client/bin/zkCli.sh

好的 非常感谢 我去操作一下

一样的额,你要加认证的。

嗯嗯 谢谢已经解决了 但是有个新问题了, 就是我用

./kafka-producer-perf-test.sh --num-records 2000000 --record-size 3000 --throughput 100000 --topic test-rep-one --producer-props

这个命令测试时候200W数据没问题,上升到300W时候就报错了:

org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-rep-one-5: 30942 ms has passed since last append org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-rep-one-2: 30021 ms has passed since batch creation plus linger time

能不能指个大概方向,网上说的listener改ip,加大request时间都改了貌似不起作用。。。。头晕

过了30秒了,还没未发出。看这里:
https://www.orchome.com/511
搜request关键字。

马踏紫陌 5年前

大神,在javaj客户端配置了

props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");

Caused by: javax.security.auth.login.LoginException: 无法找到 LoginModule 类: org.apache.kafka.common.security.plain.PlainLoginModule

的错

马踏紫陌 5年前

大神,那个控制某个ip某个用户,这个用户假如在java端应该怎么配置

马踏紫陌 5年前

大神,我想问一下这里的用户可以和数据库表里的用户关联起来吗,如果不能的话,这里的用户有什么意义呢,靠什么区分呢

马踏紫陌 5年前

可不可以设置客户端Java KafkaConsumer的权限?

半兽人 -> 马踏紫陌 5年前

可以呀,acl就是控制用户的读写权限的。

马踏紫陌 -> 半兽人 5年前

就是我的java端(10.29.28.207),一个xshell端(10.29.180.131),怎么在131端控制java那边consumer的权限和producer的权限、

User:* has Deny permission for operations: Read from hosts: 10.29.28.207
User:* has Deny permission for operations: Write from hosts: 10.29.28.207
User:* has Allow permission for operations: Read from hosts: 10.29.180.131
User:* has Allow permission for operations: Write from hosts: 10.29.180.131

这个是远程的

String connectionString = SystemConfigProperties.getProperty("kafka.bootstrap.servers", "127.0.0.1:9092");
  Properties props = new Properties();
  props.put("bootstrap.servers", connectionString);
  props.put("group.id", groupid);
  props.put("enable.auto.commit", "false");
  props.put("auto.commit.interval.ms", SystemConfigProperties.getProperty("kafka.auto.commit.interval.ms", "1000"));
  props.put("session.timeout.ms", SystemConfigProperties.getProperty("kafka.session.timeout.ms", "30000"));
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  String type="earliest";
  if(type.equals("earliest")){
   props.put("auto.offset.reset", "earliest");
  }

这个是java的,发现还是没有效果

马踏紫陌 5年前

31机器:

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 10.29.180.132 --operation Read --topic Test-topic
bin/kafka-console-producer.sh --broker-list zk2:9092 --topic Test-test
>test2ettest2

32机器:

root@dspdevweb2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server zk3:9092 --topic Test-test --from-beginning
testtest
333
55555
test2ettest2
查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章