kafka 、zookeepr开启keberos认证后,利用java api还是可以不痛认证kerberos就可以删除kafka中的主题?

想喝好几罐八宝粥的男孩 发表于: 2022-04-11   最后更新时间: 2022-04-11 21:49:26   125 游览

在公司搭建单机版本的zk和kafka,zookeeper以及kakfa开启kerberos认证后,集群可以正常搭建,kafka发送数据及消费数据进行kerberos认证,但是在利用kakfa 服务端api在连接zookeeper后,可以删除主题和新建节点,不需要进行kerberos认证,很奇怪这是什么原因,麻烦有知道的给指点一下吧。我把我的集群中相关配置贴出来:


zookeeper相关配置

zoo.cfg:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/ceshi/zookeeper/data
dataLogDir=/home/ceshi/zookeeper/log
clientPort=22181
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
sessionRequireClientSASLAuth=true 
requireClientAuthScheme=sasl

zk_server_jaas.conf

Server {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   storeKey=true
   keyTab="/home/ceshi/zookeeper.keytab"
   principal="zookeeper/bigdata1@TEST.COM"
   userTicketCache=false;
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   storeKey=true
   keyTab="/home/ceshi/zk-client.keytab"
   principal="zk-client@TEST.COM"
   userTicketCache=false;
};

java.env

export SERVER_JVMFLAGS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/ceshi/zookeeper/zookeeper-3.4.10/conf/zk_server_jaas.conf"
export CLIENT_JVMFLAGS="${CLIENT_JVMFLAGS} -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/ceshi/zookeeper/zookeeper-3.4.10/conf/zk_server_jaas.conf -Dzookeeper.server.principal=zookeeper/bigdata1@TEST.COM"
#export JVMFLAGS="-Djava.security.auth.login.config=/home/ceshi/zookeeper/zookeeper-3.4.10/conf/zk_server_jaas.conf"

kafka相关配置:

server.properties

listeners=SASL_PLAINTEXT://bigdata1:29092
advertised.listeners=SASL_PLAINTEXT://bigdata1:29092
port=29092
host.name=bigdata1
security.inter.broker.protocol=SASL_PLAINTEXT  
sasl.enabled.mechanisms=GSSAPI  
sasl.mechanism.inter.broker.protocol=GSSAPI
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
sasl.kerberos.service.name=kafka

kafka_server_jaas.conf

KafkaServer{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/home/ceshi/kafka.keytab"
principal="kafka/bigdata1@TEST.COM";
};
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/home/ceshi/kafka.keytab"
principal="kafka/bigdata1@TEST.COM"
userTicketCache=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
keyTab="/home/ceshi/zk-client.keytab"
principal="zk-client@TEST.COM";
};

kafka-server-start.sh

最后末尾添加:

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/ceshi/kafka/kafka_2.11-1.1.1/config/kafka_server_jaas.conf   kafka.Kafka "$@"

kakfa api程序如下:

pom.xml如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
</dependency>
package com.ieslab.cn.kaftest;

import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.kafka.common.security.JaasUtils;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;



public class TestConsumer {
    public static void main(String[] args) throws Exception  {

        ZkUtils zkUtils = ZkUtils. apply ("172.14.59.190:22181", 30000, 30000, JaasUtils.
                isZkSecurityEnabled());
        AdminUtils.createTopic(zkUtils,"test003",1,1,new Properties (),RackAwareMode.Enforced$.MODULE$) ;
                zkUtils.close();
    }
}

在这个程序中可以,进行新建和删除主题,引用的的kakfa服务端的pom,很奇怪,大家又遇同样情况的吗?

发表于 2022-04-11
添加评论

我没有测试过admin接口,但是如果你的消费者和生产的代码需要认证的话,那服务器的认证是有效的。

你去看看kafka的日志,是否有认证相关的日志输出。

都有的,我的意思是说为什么利用java api连接zk还是可以新建主题和删除主题呢?我前提是zk已经加了认证。

不走zk,调用是走kafka的呀,kafka验证过了,流程如下:

admin client -> kafka -> zk

那我这里不是直接调用的zk吗?zk直接能获取kakfa里面的topics信息,不用进行认证就可以直接删除主题。

ZkUtils zkUtils = ZkUtils. apply ("172.14.59.190:22181", 30000, 30000, JaasUtils.
                isZkSecurityEnabled());
AdminUtils.createTopic(zkUtils,"test003",1,1,new Properties (),RackAwareMode.Enforced$.MODULE$) ;
                zkUtils.close();

你看下引用的包,都是kafka包下的呀。

那既然引用的kakfa包下的,那我上面的代码不更应该需要直接走kerberos认证才能删除代码吗?为何我不用认证kerberos,就可以直接删除主题呢?我想问的是这个。

你整个java客户端都没有认证?那你执行

client.listTopics();

看看,如果这个也能查到,我就怀疑你的认证是否开启成功了。

参考:使用Java管理kafka集群

利用kakfa_clients的java api连接认证的kerberos kafka集群是无法进行主题列表和创建主题的,但是我在pom.xml直接引入如下包:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>1.1.1</version>
</dependency>

利用服务端的kakfa连接zookeeper是可以不用认证的的,利用zk可以直接创建主题或者删除主题,及时下面的代码。

ZkUtils zkUtils = ZkUtils. apply ("172.14.59.190:22181", 30000, 30000, JaasUtils.
                isZkSecurityEnabled());
AdminUtils.createTopic(zkUtils,"test003",1,1,new Properties (),RackAwareMode.Enforced$.MODULE$) ;
                zkUtils.close();
你的答案

查看kafka相关的其他问题或提一个您自己的问题
提问