Apache Kafka 获取指定主题的消费者列表

啊啊 发表于: 2021-11-27   最后更新时间: 2021-11-27 23:40:28   724 游览

有什么方法可以在java中获得特定主题的消费者列表吗?到现在为止,我能够获得主题列表:

final ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> kafkaFuture = listTopicsResult.names();
Set<String> map = kafkaFuture.get();

但我还没有找到一种方法来获得每个主题的消费者列表。

发表于 2021-11-27

我最近正在为我的kafka客户端工具解决同样的问题。挺麻烦的,但我从代码中发现的唯一方法是如下:

Properties props = ...//here you put your properties
AdminClient kafkaClient = AdminClient.create(props);

//Here you get all the consumer groups
List<String> groupIds = kafkaClient.listConsumerGroups().all().get().
                       stream().map(s -> s.groupId()).collect(Collectors.toList()); 

//Here you get all the descriptions for the groups
Map<String, ConsumerGroupDescription> groups = kafkaClient.
                                               describeConsumerGroups(groupIds).all().get();
for (final String groupId : groupIds) {
    ConsumerGroupDescription descr = groups.get(groupId);
    //find if any description is connected to the topic with topicName
    Optional<TopicPartition> tp = descr.members().stream().
                                  map(s -> s.assignment().topicPartitions()).
                                  flatMap(coll -> coll.stream()).
                                  filter(s -> s.topic().equals(topicName)).findAny();
            if (tp.isPresent()) {
                //you found the consumer, so collect the group id somewhere
            }
}

用的是kafkadmin客户端,参考来自:

你的答案

查看kubernetes相关的其他问题或提一个您自己的问题