kafka在正在运行的集群中整合安全功能

原创
半兽人 发表于: 2016-07-29   最后更新时间: 2019-03-22 10:02:39  
{{totalSubscript}} 订阅, 12,119 游览

7.5 将安全功能合并到运行的群集中

你可以为正在运行的集群增加1个或多个我们前面讨论的安全协议,这是分阶段完成的:

  • 以增量替换的方式添加额外的安全端口(s)。
  • 客户端使用安全的端口来连接,而不是PLAINTEXT端口的(假设你是客户端需要安全连接broker)。
  • 再次增量的方式依次启用broker与broker之间的安全端口(如果需要)
  • 最后依次关闭PLAINTEXT端口。

7.2和7.3节介绍了配置SSL和SASL的具体步骤。 按照以下步骤启用所需的安全协议。

broker与clientbroker与broker之间配置安全通讯协议。这些都必须新增启用,PLAINTEXT端口必须保留,是为了broker或客户端可以继续通讯。

当依次替换时,broker通过SIGTERM进行清理。等待重新启动的副本在移动到下一个节点之前返回到ISR列表也是一个很好的做法。

举个例子,假设我们希望在broker与clientbroker与broker之间使用SSL进行通讯加密,那么需要在每个节点上打开SSL端口:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092

然后,我们重新启动client,改变指向新的安全端口:

bootstrap.servers = [broker1:9092,...]
security.protocol = SSL
...etc

设置broker-broker协议(同样使用SSL端口):

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
security.inter.broker.protocol=SSL

最后,我们关闭PLAINTEXT端口:

listeners=SSL://broker1:9092
security.inter.broker.protocol=SSL

另外,我们也可以打开多个端口,使用不同协议实现broker-broker和broker-client之间通讯。假设我们希望都使用SSL加密(即,broker-broker和broker-client通讯),但是我们也想对broker-client连接增加SASL认证,我们通过打开2个额外的端口来实现这一点:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093

然后,重新启动客户端,改变他们的配置指向新的SASL&SSL安全端口:

bootstrap.servers = [broker1:9093,...]
security.protocol = SASL_SSL
...etc

然后,服务器将逐步切换,切换broker-broker之间的通讯到SSL。

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL

最后,关闭PLAINTEXT端口.

listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL

ZooKeeper可以独立于Kafka集群进行安全保护。第7.6.2节将介绍相关步骤。

更新于 2019-03-22

假设配置为listeners=PLAINTEXT://broker1:9091,SASL://broker1:9092,切换完成之后9091端口还能接受到没人认证到请求吗

可以,你必须要关掉这个出口

我碰到一个问题,目前有两个节点都是没有认证的,在切换一台之后查看topic的详情的时候ISR还是只有一个,不知道为何
配置如下
(无认证)
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zk:2181
KAFKA_LISTENERS: PLAINTEXT://:19092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xx.xx.xx.xx:19092
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_LOG_CLEANUP_POLICY: "delete"
KAFKA_DELETE_TOPIC_ENABLE: "true"
(认证)
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zk:2181
KAFKA_LISTENERS: PLAINTEXT://:29092,SASL_PLAINTEXT://:29093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xx.xx:29092,SASL_PLAINTEXT://xx.xx:29093
KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf
KAFKA_LOG_CLEANUP_POLICY: "delete"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_SUPER_USERS: User:admin
KAFKA_SASL_ENABLED_MECHANISMS: PLAINTEXT.
消费者就停止了
这个异常信息
Exception in thread "main" org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: 1

KAFKA_ADVERTISED_LISTENERS 这个已经弃用了呀。

那我注释掉这个参数也是一样的结果

看了下文档, 这个没被弃用啊

消费者指定端口了么,broker之间正常吗?

应该是broker通信有问题
Listeners are not identical across brokers: Map(2 -> Map(ListenerName(PLAINTEXT) -> xx:29092 (id: 2 rack: null), ListenerName(SASL_PLAINTEXT) -> xx:29093 (id: 2 rack: null)), 1 -> Map(ListenerName(PLAINTEXT) -> xx:19092 (id: 1 rack: null))) (kafka.server.MetadataCache)
这个怎么解决

还有其他的错误么。这个broker1的错,还是broker2报的。

半兽人 -> 半兽人 5年前

最好发个问题帖吧

发了个问题贴https://www.orchome.com/1521 ,帮我看下,十分感谢

这个日志是在controller让broker更新matadata的时候爆出来的,我个人认为不影响使用,我也遇到了这个问题

这个是升级过程中集群中的broker listen name没有对齐的原因,都升级了就好了

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章