Kafka SASL验证

    原创
半兽人 发表于: 2017-05-03   最后更新时间: 2019-10-18  

7.3 使用SASL认证

1. JAAS配置

Kafka使用Java认证授权服务(JAAS)进行SASL配置。

  1. 为kafka broker配置JAAS

    KafkaServer是每个KafkaServer/Broker使用的JAAS文件中的名称。本节提供broker的SASL配置选项,包括进行broker之间通信所需的SASL客户端连接。如果将多个listeners配置为SASL,则名称可以在listeners名称前以小写字母开头,后跟一个句点,例如sasl_ssl.KafkaServer

    客户端部分用于验证与zookeeper的SASL连接。它还允许broker在zookeeper节点上设置SASL ACL。并锁定这些节点,以便只有broker可以修改它。所有的broker必须principal名称相同。如果要使用客户端以外的名称,设置zookeeper.sasl.client(例如,-Dzookeeper.sasl.clientconfig=ZkClient)。

    默认情况下,Zookeeper使用 “zookeeper” 作为服务名称。如果你需要修改,设置zookeeper.sasl.client.user(例如,-Dzookeeper.sasl.client.username=zk

    Broker还可以使用sasl.jaas.config配置JAAS。属性名称必须以包括SASL机制的listener前缀为前缀,例如:listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config只能指定一个登录模块。 如果listener上配置了多种机制,则listener必须使用和机制前缀为每种机制提供的配置。 例如:

    listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
       username="admin" \
       password="admin-secret";
    listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
       username="admin" \
       password="admin-secret" \
       user_admin="admin-secret" \
       user_alice="alice-secret";
    

    如果在不同级别定义了JAAS配置,则使用的优先级顺序为:

    • broker配置属性listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
    • 静态JAAS配置{listenerName}.KafkaServer
    • 静态JAAS配置KafkaServer

    请注意,只能使用静态JAAS配置来配置ZooKeeper JAAS配置。

    有关broker配置的示例,请参见GSSAPI(Kerberos)PLAINSCRAMOAUTHBEARER

  2. 为Kafka client配置JAAS

    客户端可以使用sasl.jaas.config或使用类似broker的静态JAAS配置文件配置JAAS。

    1. 客户端的JAAS配置

      客户端可以将JAAS配置指定给生产者或消费者,无需创建物理配置文件。通过为每个客户端指定不同的属性,此模式还使同一JVM中的不同生产者和消费者可以使用不同的凭据。如果同时指定了静态JAAS配置系统属性java.security.auth.login.config和客户端属性sasl.jaas.config,则将会使用客户端的属性。

      请参见GSSAPI(Kerberos)PLAINSCRAMOAUTHBEARER

    2. 静态文件配置JAAS

      使用静态JAAS配置文件来配置客户端上的SASL认证。

      1. 添加一个名为KafkaClient的客户端登录的JAAS配置文件。 在KafkaClient中为所选机制配置登录模块,如设置GSSAPI(Kerberos)PLAINSCRAM的示例中所述。 例如,GSSAPI凭据可以配置为:
        KafkaClient {
         com.sun.security.auth.module.Krb5LoginModule required
         useKeyTab=true
         storeKey=true
         keyTab="/etc/security/keytabs/kafka_client.keytab"
         principal="kafka-client-1@EXAMPLE.COM";
        };
        
      2. 将JAAS配置文件位置作为JVM参数传递给每个客户端JVM。 例如:
        -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
        

2. SASL配置

SASL可与PLAINTEXTSSL一起或分别用作安全协议传输层(SASL_PLAINTEXTSASL_SSL,如果使用SASL_SSL,则必须配置SSL)。

  1. SASL机制

    Kafka支持以下的SASL机制:

    • GSSAPI (Kerberos)
    • PLAIN
    • SCRAM-SHA-256
    • SCRAM-SHA-512
    • OAUTHBEARER
  2. 为Kafka broker配置SASL

    1. server.properteis配置一个SASL端口,SASL_PLAINTEXT或SASL_SSL添加到listeners中(至少一个),用逗号分隔:

      listeners=SASL_PLAINTEXT://host.name:port
      

      如果你只配置一个SASL端口(或者如果你需要broker使用SASL互相验证),那么需要确保broker之间设置相同的SASL协议:

      security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)

    2. 选择一个或多个支持的机制,并通过以下的步骤为机器配置SASL。在broker之间启用多个机制:

  3. 为Kafka client配置SASL

    SASL仅支持新的java生产者和消费者,不支持老的API。

    要在客户端上配置SASL验证,选择在broker中启用的客户端身份验证的SASL机制,并按照以下步骤配置所选机制的SASL。

开始SASL认证

  1. SASL之Kerberos认证
  2. SASL之PLAIN认证
  3. SASL之SCRAM认证
  4. 在broker中启用多个SASL机制
  5. 在运行的集群中修改SASL机制


您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka使用SSL加密和认证
下一条: kafka使用SASL/Kerberos认证

  • 你好,请问一下,现在kafka集群开启了sasl认证后,发现同一消费组中的消费者没有退出动作,但是一直在向kafka集群发送认证请求,请问下SASL认证是有时间限制吗?过段时间需要认证一次?

    请问这个kafka sasl配置完后,本地测试生产消费速度比原来慢很多,消费阻塞住了,这个是怎么回事?

    • props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
      props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

          props.put("sasl.jaas.config",
                  "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"Alice-123\";");
      
          Consumer<String, String> consumer = new KafkaConsumer<>(props);
          consumer.subscribe(Collections.singletonList("test2019"));
          while (true) {
              //这里是得到ConsumerRecords实例
              ConsumerRecords<String, String> records = consumer.poll(100);
              for (ConsumerRecord<String, String> record : records) {
      
                  System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
              }
          }
      } 
      

        在使用Kerberos认证环境下,使用kafka-console-consumer消费数据,当用zookeeper进行消费是出现如下错误。查看源码发现在源码中写死了只能获取Broker安全协议为PLAINTEXT的节点信息。大神,是不是使用zookeeper的方式,不能使用认证方式?

        kafka.common.BrokerEndPointNotAvailableException: End point PLAINTEXT not found for broker 0
            at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:141)
            at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:180)
            at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:180)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
            at scala.collection.AbstractTraversable.map(Traversable.scala:105)
            at kafka.utils.ZkUtils.getAllBrokerEndPointsForChannel(ZkUtils.scala:180)
            at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
        

        大神.,我配置了sasl之后,首先在服务器上不能创建消费增者,报disconnected (org.apache.kafka.clients.NetworkClient)

        错误,其次我用java端作为client端System.setProperty("java.security.auth.login.config","E:\CL\kafka_client_jaas.conf");
          props.put("security.protocol","SASL_PLAINTEXT");
          props.put("sasl.mechanism","PLAIN");
        配置了,启动之后看不到消费的数据

        kafka服务器同时启用kerberos和SSL不行吧?

        为什么oracle java要替换成java版本的JCE策略文件,不替换会有什么影响?

        我可不可以配置两个端口,一个使用sasl安全认证,一个不使用安全认证

        • 请问您一个问题,当kafka与zookeeper都启动了sasl/kerberos后,发现向一个不存在的topic生产消息时,kafka不会自动创建这个topic。已检查配置文件,保证开启自动创建topic功能,请问这是为什么?

            • 首先已经确认在kafka的配置文件server.properties中添加 auto.create.topics.enable=true。
              随后使用命令./kafka-console-producer.sh --broker-list 172.16.101.202:9092 --topic test9 --producer.config ../config/producer.properties
              并输入内容,回车后,提示如下报错

              [2019-11-15 14:14:08,832] WARN Error while fetching metadata with correlation id 0 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
              [2019-11-15 14:14:09,034] WARN Error while fetching metadata with correlation id 1 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
              [2019-11-15 14:14:09,137] WARN Error while fetching metadata with correlation id 2 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
              [2019-11-15 14:14:09,240] WARN Error while fetching metadata with correlation id 3 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
              [2019-11-15 14:14:09,343] WARN Error while fetching metadata with correlation id 4 : {test9=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
              

              zookeeper此时没有任何日志内容。
              kafka此时的日志内容:

              [2019-11-15 14:09:09,442] INFO Successfully authenticated client: authenticationID=kafka/nf5466c18-app@EXAMPLE.COM; authorizationID=kafka/nf5466c18-app@EXAMPLE.COM. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
              [2019-11-15 14:09:09,442] INFO Setting authorizedID: kafka (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
              [2019-11-15 14:09:09,636] INFO Successfully authenticated client: authenticationID=kafka/nf5466c18-app@EXAMPLE.COM; authorizationID=kafka/nf5466c18-app@EXAMPLE.COM. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
              [2019-11-15 14:09:09,636] INFO Setting authorizedID: kafka (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
              

              kerberos此时的日志内容:

              Nov 15 14:14:08 nf5466b12-app krb5kdc[86974](info): AS_REQ (4 etypes {18 17 16 23}) 172.***.***.202: ISSUE: authtime 1573798448, etypes {rep=18 tkt=18 ses=18}, kafka/nf5466c18-app@EXAMPLE.COM for krbtgt/EXAMPLE.COM@EXAMPLE.COM
              Nov 15 14:14:08 nf5466b12-app krb5kdc[86974](info): TGS_REQ (4 etypes {18 17 16 23}) 172.***.***.202: ISSUE: authtime 1573798448, etypes {rep=18 tkt=18 ses=18}, kafka/nf5466c18-app@EXAMPLE.COM for kafka/nf5466c18-app@EXAMPLE.COM
              

              上述各服务的日志都没有报错,随后我将kafka的日志级别调整至DEBUG时,启动kafka时会循环刷一个报错

              [2019-11-15 14:18:03,803] DEBUG Set SASL server state to HANDSHAKE_REQUEST (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
              [2019-11-15 14:18:03,803] DEBUG Handle Kafka request METADATA (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
              [2019-11-15 14:18:03,803] DEBUG Set SASL server state to FAILED (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
              [2019-11-15 14:18:03,803] DEBUG Connection with /172.***.***.202 disconnected (org.apache.kafka.common.network.Selector)
              java.io.IOException: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka request of type METADATA during SASL handshake.
                      at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:243)
                      at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64)
                      at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:338)
                      at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
                      at kafka.network.Processor.poll(SocketServer.scala:476)
                      at kafka.network.Processor.run(SocketServer.scala:416)
                      at java.lang.Thread.run(Thread.java:748)
              Caused by: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka request of type METADATA during SASL handshake.
              
                • export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
                  bin/kafka-console-producer.sh --broker-list orchome:9093 --topic test --producer.config config/producer.properties
                  
                    • 已经将包含krb5配置文件的绝对路径和client_jass文件的绝对路径的参数添加到kafka-run-class.sh,并修改了kafka-console-producer.sh脚本,确保成功调用该参数,而且使用生产命令对已存在的topic进行生产时,不会提示任何报错,而且也可以实时消费到。 只是对一个不存在的topic进行生产时,才会出现报错