kafka认证和acl

7.4 认证和acl

kafka附带一个可插拔的ACL(Access Control List 访问控制列表),它使用zookeeper来存储。通过在server.properties中设置authorizer.class.name来启用:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

Kafka acls的格式为 "Principal P is [Allowed/Denied] Operation O From Host H On Resource R”,你可以使用Kafka authorizer CLI 来添加,删除或查询所有acl。默认情况下,如果ResourcePatterns与特定的资源R没有匹配,则除了超级用户之外,都不允许访问R。如果要更改该行为,可以在server.properties中包含以下内容。

allow.everyone.if.no.acl.found=true

你也可以在server.properties添加超级用户,像这样(注意分隔符是分号,因为SSL的用户名是逗号)。

super.users=User:Bob;User:Alice

默认情况下,SSL用户名的格式为“CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”。可以通过在server.properties中设置自定义的PrincipalBuilder来改变它,如下所示:

principal.builder.class=CustomizedPrincipalBuilderClass

可以通过修改server.properties中的sasl.kerberos.principal.to.local.rules自定义规则。sasl.kerberos.principal.to.local.rules的格式是一个列表,其中每个规则的工作方式与Kerberos 配置文件 (krb5.conf)中的auth_to_local相同。 也支持小写规则,可通过在规则的末尾添加“/L”,强制转移全部结果为小写。每个规则都以RULE开头:并包含一个表达式,格式如下。 有关更多详细信息,请参阅kerberos文档。

RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L

举个例子,添加规则,将user@MYDOMAIN.COM转换为用户,同时保持默认规则,示例如下:

sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

命令行界面

Kafka认证管理CLI(和其他的CLI脚本)可以在bin目录中找到。CLI脚本名是kafka-acls.sh。以下列出了所有脚本支持的选项:

选项 描述 默认 类型选择
--add 添加一个acl Action
--remove 移除一个acl Action
--list 列出acl Action
--authorizer authorizer的完全限定类名 kafka.security.auth.SimpleAclAuthorizer Configuration
--authorizer-properties key=val,传给authorizer进行初始化,例如:zookeeper.connect=localhost:2181 Configuration
--cluster 指定集群作为资源。 Resource
--topic [topic-name] 指定topic作为资源。 Resource
--group [group-name] 指定 consumer-group 作为资源。 Resource
-allow-principal 添加到允许访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--deny-principal 添加到拒绝访问的ACL中,Principal是PrincipalType:name格式。
你可以指定多个。
Principal
--allow-host --allow-principal中的principal的IP地址允许访问。 如果--allow-principal指定的默认值是*,则意味着指定“所有主机” Host
--deny-host 允许或拒绝的操作。
有效值为:读,写,创建,删除,更改,描述,ClusterAction,全部
ALL Operation
--operation --deny-principal中的principals的IP地址拒绝访问。 如果 --deny-principal指定的默认值是 * 则意味着指定 "所有主机" Host
--producer 为producer角色添加/删除acl。生成acl,允许在topic上WRITE, DESCRIBE和CREATE集群。 Convenience
--consumer 为consumer role添加/删除acl,生成acl,允许在topic上READ, DESCRIBE 和 consumer-group上READ。 Convenience
--force 假设所有操作都是yes,规避提示 Convenience

例子

  • 添加acl
    假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    

    默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
    

    需要注意的是--allow-hostdeny-host仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。

  • 删除acl
    删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    
  • acl列表
    我们可以通过--list选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
    
  • 添加或删除作为生产者或消费者的principal
    acl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
    

    同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
    

    注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。






发表于: 3年前   最后更新时间: 9月前   游览量:21104
上一条: kafka SASL验证
下一条: kafka在正在运行的集群中整合安全功能

  • 您好,前一段在您的指导下解决了kafka认证与程序消费问题,这两天想做一个kafka压力测试,
    使用的方法是:
    ./kafka-producer-perf-test.sh --num-records 50000000 --record-size 3000 --throughput 100000 --topic test-rep-one --producer-props bootstrap.servers=server1.hzguode.com:9092,server2.hzguode.com:9092,server3.hzguode.com:9092 --producer.config /hadoop/app/kafka/config/producer.properties
    现在遇到一个问题:不加 --producer.config会出现之前的disconnect错误,加上之后出现:
    Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
    这个错误明明之前解决了,是不是测试命令不是这么加的?希望您能抽时间帮我看一下,谢谢了!
    您好,一直在您这里学习,今天遇到了一个问题,
    [zk: localhost:2181(CONNECTED) 0] rmr /brokers/topics/test5
    Authentication is not valid : /brokers/topics/test5/partitions

    test5 topic 删除的时候报这个错误,之前zookeeper和kafka做过SASL认证,能帮我看下是什么原因么?谢谢
    • 1、可以采用连接方式:zookeeper-client/bin/zkCli.sh -server `hostname -f`:2181
      2、执行 zkCli.sh之前先执行(可添加到zkCli.sh最前面):
      export JVMFLAGS="-Djava.security.auth.login.config=/etc/zookeeper/conf/zookeeper_jaas.conf"(个人采取方式,加入了/etc/profile环境变量)
      3、先执行source /etc/zookeeper/conf/zookeeper-env.sh,再执行/usr/hdp/current/zookeeper-client/bin/zkCli.sh
        • 嗯嗯 谢谢已经解决了 但是有个新问题了,
          就是我用./kafka-producer-perf-test.sh --num-records 2000000 --record-size 3000 --throughput 100000 --topic test-rep-one --producer-props 这个命令测试时候200W数据没问题,上升到300W时候就报错了:
          org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-rep-one-5: 30942 ms has passed since last append
          org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-rep-one-2: 30021 ms has passed since batch creation plus linger time

          能不能指个大概方向,网上说的listener改ip,加大request时间都改了貌似不起作用。。。。头晕
            大神,在javaj客户端 配置了props.put("security.protocol","SASL_PLAINTEXT");
              props.put("sasl.mechanism","PLAIN");,报Caused by: javax.security.auth.login.LoginException: 无法找到 LoginModule 类: org.apache.kafka.common.security.plain.PlainLoginModule的错
            大神,那个控制某个ip某个用户,这个用户假如在java端应该怎么配置
            大神,我想问一下这里的用户可以和数据库表里的用户关联起来吗,如果不能的话,这里的用户有什么意义呢,靠什么区分呢
            可不可以设置客户端Java KafkaConsumer的权限?
            • User:* has Deny permission for operations: Read from hosts: 10.29.28.207
               User:* has Deny permission for operations: Write from hosts: 10.29.28.207
               User:* has Allow permission for operations: Read from hosts: 10.29.180.131
               User:* has Allow permission for operations: Write from hosts: 10.29.180.131 
              这个是远程的
                • String connectionString = SystemConfigProperties.getProperty("kafka.bootstrap.servers", "127.0.0.1:9092");
                    Properties props = new Properties();
                    props.put("bootstrap.servers", connectionString);
                    props.put("group.id", groupid);
                    props.put("enable.auto.commit", "false");
                    props.put("auto.commit.interval.ms", SystemConfigProperties.getProperty("kafka.auto.commit.interval.ms", "1000"));
                    props.put("session.timeout.ms", SystemConfigProperties.getProperty("kafka.session.timeout.ms", "30000"));
                    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                    String type="earliest";
                    if(type.equals("earliest")){
                     props.put("auto.offset.reset", "earliest");
                    }
                  这个是java的,发现还是没有效果
                    31机器:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 10.29.180.132 --operation Read --topic Test-topic
                    bin/kafka-console-producer.sh --broker-list zk2:9092 --topic Test-test
                    >test2ettest2
                    32机器:
                    root@dspdevweb2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server zk3:9092 --topic Test-test --from-beginning
                    testtest
                    333
                    55555
                    test2ettest2

                    ERROR Error when sending message to topic Test-topic with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
                    org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Test-topic]是因为没有配置authorizer.class.name吗

                    我启动之后显示认证失败,这是什么问题?
                    [2018-04-16 15:53:06,016] ERROR An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.client.ZooKeeperSaslClient)
                    [2018-04-16 15:53:06,016] ERROR SASL authentication with Zookeeper Quorum member failed: javax.security.sasl.SaslException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating Zookeeper Quorum Member's  received SASL token. Zookeeper Client will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
                    [2018-04-16 15:53:06,016] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient)
                    [2018-04-16 15:53:06,016] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
                    [2018-04-16 15:53:06,017] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
                    org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure
                    你好,单独设置这个好像没什么用,是否要跟kerberos之类的安全认证一起使用才可以。就算设置了acl,所有的用户还是都可以读写kafka