G1-JVM

1 声望

这家伙太懒,什么都没留下

个人动态
  • G1-JVM 回复 G1-JVMkafka获取topic下所有消费组信息 中 :

    补充一下最后的解决办法,也许会有人用到
    我们的Kafka监控用的是kafka manager,是由雅虎公司开源发布的,现已更名为CMAK,
    该监控系统提供了一套API,用以获取kafk集群的一些信息,
    其中便有获取所有消费组的接口,定义如下:
    GET /api/status/:cluster/consumersSummary 其中:cluster为kafka客户端名称
    使用代码调用API的话,需要加上Http的基础验证,使用方法百度一下即可
    其他详细API地址为:https://github.com/yahoo/CMAK/blob/master/conf/routes

    2月前
  • G1-JVM 回复 半兽人kafka获取topic下所有消费组信息 中 :

    非常感谢回答,我会尝试的

    2月前
  • 半兽人 回复 G1-JVMkafka获取topic下所有消费组信息 中 :

    你的消费者组不活跃了,自然就找不到了:

    public List<String> activeConsumerByTopic(String topicName) {
            List<String> lists = new ArrayList<>();
            try (AdminClient client = KafkaAdminFactory.getInstance()) {
                try {
                    // get all consumer groupId
                    List<String> groupIds = client.listConsumerGroups().all().get().stream().map(s -> s.groupId()).collect(Collectors.toList());
                    // Here you get all the descriptions for the groups
                    Map<String, ConsumerGroupDescription> groups = client.describeConsumerGroups(groupIds).all().get();
                    for (final String groupId : groupIds) {
                        ConsumerGroupDescription descr = groups.get(groupId);
                        // find if any description is connected to the topic with topicName
                        Optional<TopicPartition> tp = descr.members().stream().
                                map(s -> s.assignment().topicPartitions()).
                                flatMap(coll -> coll.stream()).
                                filter(s -> s.topic().equals(topicName)).findAny();
                        if (tp.isPresent()) {
                            // you found the consumer, so collect the group id somewhere
                            lists.add(descr.groupId());
                        }
                    }
                } catch (InterruptedException | ExecutionException e) {
                    throw new IllegalStateException(e);
                }
            }
            return lists;
        }
    

    你可以安装一个我重写的kafka监控,看看是否可以获取到,参考:KafkaOffsetMonitor raft版:监控消费者和延迟的队列

    如果能达到你的预期,参考核心功能代码:
    https://github.com/orchome/KafkaOffsetMonitor/blob/main/src/main/java/www/orchome/com/kafka/core/KafkaService.java

    2月前
  • G1-JVM 回复 半兽人kafka获取topic下所有消费组信息 中 :

    这个命令有使用,但是正如问题描述的,我这里是查不到消费组下topic的信息的。不知道是否是因为我这里消费时,是用consumer.assign()方法手动指定消费位置,导致kafka无法管理消费组导致的

    2月前
  • 半兽人 回复 G1-JVMkafka获取topic下所有消费组信息 中 :

    listConsumerGroups

    参考:使用Java管理kafka集群

    2月前
  • 赞了 G1-JVMkafka频繁报 OutOfMemoryError: Direct buffer memory 异常 的评论!

    看了下,是你的消息堵塞的太严重了,来不及发送,越堆越多,导致的oom。

    • 减少延迟,因为你的消息量级已经够了,不要等待0.1秒了,改成1或者0

      prop.put(ProducerConfig.LINGER_MS_CONFIG, 100);// 该参数是控制消息发送延时的 默认参数是0
      
    • batch.size,虽然加大了,但是你带宽不够,来不及发送。

      prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576);// 默认参数事16384即16KB
      
    • acks=0
      acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。

    • 更多生产者配置,参考:Kafka Producer配置

    最后

    其实就是消息太多,而来不及发送,导致的。

    很大可能是kafka服务器节点的硬件(如网络)到达了瓶颈,你可以通过增加分区数/或者kafka节点数来分摊压力,提高发送速度,减少堵塞的消息量。

    9月前