连接不上kafka,日志提示Set SASL client state to FAILED

today 发表于: 2021-04-20   最后更新时间: 2021-04-21 00:05:50   1,914 游览

代码如下:

KafkaConsumer<String, String> consumer;
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "11.223.34.66:19093");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1500");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
// Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项:latest, earliest, none
properties.put("auto.offset.reset", "earliest");
// 批量一次最大拉取数据量
properties.put("max.poll.records", "30");
// 心跳
properties.put("heartbeat.interval.ms", "10000");
// 处理逻辑最大时间
properties.put("max.poll.interval.ms", "60000");
// 请求响应的最长等待时间
properties.put("request.timeout.ms", "65000");
// session超时时间
properties.put("session.timeout.ms", "30000");
//security.protocol
properties.put("security.protocol","SASL_PLAINTEXT");
// sasl.mechanism
properties.put("sasl.mechanism", "PLAIN");
String loginInfo = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
        + "kafkatest" + "\"  password=\"" + "5svs" + "\";";
// sasl.jaas.config
properties.put("sasl.jaas.config", loginInfo);

// 使用配置初始化 Kafka 消费者
consumer = new KafkaConsumer<>(properties);
try {
    // 订阅 Topic
    consumer.subscribe(Collections.singletonList("PersonInfo_323d5010195e11e93e80d17d1396110c"));
    // 轮询
    while (true) {
        // 消费消息
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {

            }
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            System.out.printf("kafka lastOffSet:%s\n ", lastOffset);
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    // 关闭消费者
    consumer.close();
}

具体报错日志如下:

[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState(SaslClientAuthenticator.java:209)] - [  Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE  ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:724)] - [  Completed connection to node -1. Fetching API versions.  ]
2021-04-20 20:44:42 - [TRACE] - [org.apache.kafka.clients.NetworkClient.leastLoadedNode(NetworkClient.java:543)] - [  Found least loaded node 122.225.193.235:19093 (id: -1 rack: null)  ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.setSaslState(SaslClientAuthenticator.java:209)] - [  Set SASL client state to FAILED  ]
2021-04-20 20:44:42 - [WARN] - [org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:403)] - [  Unexpected error from 122.225.193.235/122.225.193.235; closing connection  ]
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [GSSAPI]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:704)] - [  Node -1 disconnected.  ]
2021-04-20 20:44:42 - [WARN] - [org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:585)] - [  Connection to node -1 terminated during authentication. This may indicate that authentication failed due to invalid credentials.  ]
2021-04-20 20:44:42 - [DEBUG] - [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:482)] - [  Cancelled FIND_COORDINATOR request {api_key=10,api_version=1,correlation_id=0,client_id=consumer-1} with correlation id 0 due to node -1 being disconnected  ]

大佬帮忙指点下,哪里有问题?

在线,2小时前登录
发表于 2021-04-20
添加评论

有大佬可以帮忙看看吗?是验证方式不对吗?还是什么?

半兽人 -> today 3年前

SASL的验证我没看到额,最好命令验证通过之后,客户端在进行调试吧。

贴一下你配置,和你参考的例子,让我清楚你的步骤。
https://www.orchome.com/170

你的答案

查看kafka相关的其他问题或提一个您自己的问题