kafka配置了sasl之后Consumer消费不了数据

scattered and scattered 发表于: 2020-01-18   最后更新时间: 2020-01-18  

kafka配置了sasl之后Consumer消费不了数据

screenshot


screenshot

怎么样可以看到这个日志呢?或者怎么可以排查这个问题呢?



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka鉴权配置
下一条: kafka怎么配置用户是否有权限创建topic呢?

  • System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\\kafka_client_jaas.conf");
    

    试试这种,错误日志到服务器上看,如果认证错误会提示的。

    • 这个是配置在java里面吗?在服务器的哪个位置看呢

      while (true) {
      System.setProperty("java.security.auth.login.config", System.getProperty("user.dir") + "\kafka_client_jaas.conf");
      System.setProperty("sun.security.krb5.debug", "");
      System.out.println("xccxxx");
      ConsumerRecords records = consumer.poll(50);
      
        • 大佬。是这样的,公司让我做一个消费者,然后他们公司是有做鉴权的,也把他们的代码给我,发现就是卡在那,一直不下去,然后我才想自己的服务器装个kafka,模拟下的,现在是我Kafka启动了,但是加了鉴权后,就来拿启动kafka都失败了,

            • private static final String KAFKA_BROKER = "111.229.3.24:9092,122.173.12.42:9092";
              /**
               * 消费组id
               */
              private static String GROUP_ID = "test-consumer-group";
              /**
               * 消费主题
               */
              private static String TOPIC_NAME = "first";
              
              static {
                  try {
                      props = new Properties();
                      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
                      // 指定消费者所属群组
                      props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
                      // 自动提交
                      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
                      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                      props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000000");
                      /**kafka鉴权**/
                      props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";");
                      props.put("security.protocol", "SASL_PLAINTEXT");
                      props.put("sasl.mechanism", "PLAIN");
                  } catch (Exception e) {
                      logger.error("消费者" + e);
                  }
              }
              
              public static void main(String[] args) {
                  KafkaConsumer<string, string="" > consumer = null;
                  try {
                      consumer = new KafkaConsumer<string, string = "" > (props);
                      consumer.subscribe(Arrays.asList(new String[]{TOPIC_NAME}));
                      while (true) {
                          ConsumerRecords<string, string="" > records = consumer.poll(50);
                          for (ConsumerRecord<string, string="" > record :records){
                              //消费数据
                              System.out.println(record.value().toString() + ":sys打印的数据");
                              logger.info(record.value());
                          }
                          Thread.sleep(10);
                      }
                  } catch (Exception e) {
                      System.out.println("报错了:" + e);
                      logger.error("消费者" + e);
                  } finally {
                      consumer.close();
                  }
              }
              
                • 一致的,可以问下您,1.以大佬来看,这种没反应可能是什么问题呢?2.就是关于报错信息,没有任何回应,这个应该怎么获取呢?服务器也是他们的,我们登不上去的