我调用KafkaAdminClient动态删除消费者的读权限的时候,消费者直接申请了离开小组的请求,然后再赋予权限消费者还是无法消费消息,必须重启消费者所在微服务才行。
下面是删除权限部分代码
/**
* 删除权限
* @param resourceType topic类型还是group类型
* @param name topic名或者group名
* @param principal 用户名
* @param aclOperation 读写权限
*/
protected void deleteAcl(ResourceType resourceType, String name, String principal, AclOperation aclOperation) {
ResourcePatternFilter resourcePatternFilter = new ResourcePatternFilter(resourceType, name, PatternType.LITERAL);
AccessControlEntryFilter accessControlEntryFilter=new AccessControlEntryFilter ("User:" + principal, "*", aclOperation, AclPermissionType.ALLOW);
AclBindingFilter aclBinding=new AclBindingFilter(resourcePatternFilter,accessControlEntryFilter);
Collection<AclBindingFilter> aclBindingCollection= new ArrayList<>();
aclBindingCollection.add(aclBinding);
DeleteAclsResult aclResult = adminClient.deleteAcls(aclBindingCollection);
KafkaFuture<Collection<AclBinding>> result = aclResult.all();
try {
result.get();
if (result.isDone()){
System.out.println(result.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
下面是消费者代码
@KafkaListener(topics = "${kafka.topic.name}", groupId = "#{'${kafka.consumer.group.id}'.split(',')[0]}")
public void recieveData(ConsumerRecord consumerRecord) {
log.info("Data - " + consumerRecord.value().toString() + " recieved");
try {
log.info("Group1 - 模拟业务开始");
Thread.sleep(1000);
log.info("Group1 - 模拟业务结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
错误日志截图,我也不懂哪些错误信息比较有用,我目前认为应该是消费者被删除权限之后,由于没有权限主动申请离开了消费组,再次赋予权限的时候他并没有发送重新加入小组的请求也就无法继续消费消息。能不能告诉我是我调用KafkaAdminClient写错了还是消费者部分写错了。最好能给出相应的解决方案。
你删除和加入权限的代码没有任何问题,这个主要是springboot的消费者加载问题。
没有逻辑重新初始化这个消费者了,所以你需要重新启动这个微服务,进行重新初始化。
我个人觉得这个行为是非常正确的,权限一旦确认就不会反复修改,所以这个场景本身就非常非常少。
你想,什么情况下消费者组会脱离,oom或微服务宕机的时候才会脱离,本身就与进程共存亡的。所以也没有人去专门写个逻辑,去维护你这个场景。一直去尝试加入消费者试错权限才是最大的灾难。
好的,谢谢大佬
你的答案