Error while fetching metadata for [__consumer_offsets,0]: leader not available (kafka.server.MetadataCache)

答人 发表于: 2017-12-21   最后更新时间: 2017-12-21 17:44:10   3,991 游览

kafka集群搭建好了。
./bin/kafka-console-producer.sh --broker-list 192.168.42.xx:9095 --topic topic12
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.42.xx:9095 --from-beginning --topic topic12

用命令的客户端测试。都能收到消息。但是用项目程序连接集群。消费数据,一直卡着不动。
版本是kafka_2.12-0.11.0.1
上面的日志是server.log的。
screenshot

项目DUBG日志为:

2017-12-21 17:41:39-[DEBUG]-(org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler:615) Group coordinator lookup for group DemoConsumer failed: The coordinator is not available.
2017-12-21 17:41:39-[DEBUG]-(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:225) Coordinator discovery failed for group DemoConsumer, refreshing metadata

发表于 2017-12-21
添加评论

telnet一下,地址通吗?

答人 -> 半兽人 6年前

telnet是通的,项目的生产者可以生产消息,用kafka消费端命令工具。可以消费数据。但就是项目的消费者,不能消费数据

半兽人 -> 答人 6年前

卡着不动是正常的,判断下主题是否有消息。

答人 -> 半兽人 6年前

项目生产者已经生产消息到队列中。消息者不能进行消费。队列有消息的。下面是翻阅源码,找到debug报错的位置

AbstractCoordinator 类的内部类



 private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {





        @Override

        public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {

            log.debug("Received GroupCoordinator response {} for group {}", resp, groupId);





            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();

            // use MAX_VALUE - node.id as the coordinator id to mimic separate connections

            // for the coordinator in the underlying network client layer

            // TODO: this needs to be better handled in KAFKA-1935

            Errors error = findCoordinatorResponse.error();

            clearFindCoordinatorFuture();

            if (error == Errors.NONE) {

                synchronized (AbstractCoordinator.this) {

                    AbstractCoordinator.this.coordinator = new Node(

                            Integer.MAX_VALUE - findCoordinatorResponse.node().id(),

                            findCoordinatorResponse.node().host(),

                            findCoordinatorResponse.node().port());

                    log.info("Discovered coordinator {} for group {}.", coordinator, groupId);

                    client.tryConnect(coordinator);

                    heartbeat.resetTimeouts(time.milliseconds());

                }

                future.complete(null);

            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {

                future.raise(new GroupAuthorizationException(groupId));

            } else {

//这个地方

                log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message());

                future.raise(error);

            }

        }

半兽人 -> 答人 6年前

集群不正常吧,这个错写的是leader不可用。

答人 -> 半兽人 6年前

集群不正常,具体是什么?项目可以写消息,但不能进行消费

答人 -> 半兽人 6年前

多谢,上面的问题,重新安装了kafka和zookeeper得到解决。只有感觉奇怪。项目能进行生产消息,但不能消费消息。具体的原因是什么到目前都还不知道。

半兽人 -> 答人 6年前

客气,我刚才我是让另外一位的重装,没想到你先重装了。
你这个错误明显是集群不正常。

你的答案

查看kafka相关的其他问题或提一个您自己的问题