kafka在正在运行的集群中整合安全功能

    原创
半兽人 发表于: 2016-07-29   最后更新时间: 2019-03-22  

7.5 将安全功能合并到运行的群集中

你可以为正在运行的集群增加1个或多个我们前面讨论的安全协议,这是分阶段完成的:

  • 以增量替换的方式添加额外的安全端口(s)。
  • 客户端使用安全的端口来连接,而不是PLAINTEXT端口的(假设你是客户端需要安全连接broker)。
  • 再次增量的方式依次启用broker与broker之间的安全端口(如果需要)
  • 最后依次关闭PLAINTEXT端口。

7.2和7.3节介绍了配置SSL和SASL的具体步骤。 按照以下步骤启用所需的安全协议。

broker与clientbroker与broker之间配置安全通讯协议。这些都必须新增启用,PLAINTEXT端口必须保留,是为了broker或客户端可以继续通讯。

当依次替换时,broker通过SIGTERM进行清理。等待重新启动的副本在移动到下一个节点之前返回到ISR列表也是一个很好的做法。

举个例子,假设我们希望在broker与clientbroker与broker之间使用SSL进行通讯加密,那么需要在每个节点上打开SSL端口:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092

然后,我们重新启动client,改变指向新的安全端口:

bootstrap.servers = [broker1:9092,...]
security.protocol = SSL
...etc

设置broker-broker协议(同样使用SSL端口):

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
security.inter.broker.protocol=SSL

最后,我们关闭PLAINTEXT端口:

listeners=SSL://broker1:9092
security.inter.broker.protocol=SSL

另外,我们也可以打开多个端口,使用不同协议实现broker-broker和broker-client之间通讯。假设我们希望都使用SSL加密(即,broker-broker和broker-client通讯),但是我们也想对broker-client连接增加SASL认证,我们通过打开2个额外的端口来实现这一点:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093

然后,重新启动客户端,改变他们的配置指向新的SASL&SSL安全端口:

bootstrap.servers = [broker1:9093,...]
security.protocol = SASL_SSL
...etc

然后,服务器将逐步切换,切换broker-broker之间的通讯到SSL。

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL

最后,关闭PLAINTEXT端口.

listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL

ZooKeeper可以独立于Kafka集群进行安全保护。第7.6.2节将介绍相关步骤。



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




上一条: kafka认证和acl
下一条: ZooKeeper认证【kafka】

  • 假设配置为listeners=PLAINTEXT://broker1:9091,SASL://broker1:9092,切换完成之后9091端口还能接受到没人认证到请求吗

    • 我碰到一个问题,目前有两个节点都是没有认证的,在切换一台之后查看topic的详情的时候ISR还是只有一个,不知道为何
      配置如下
      (无认证)
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zk:2181
      KAFKA_LISTENERS: PLAINTEXT://:19092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xx.xx.xx.xx:19092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_LOG_CLEANUP_POLICY: "delete"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      (认证)
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zk:2181
      KAFKA_LISTENERS: PLAINTEXT://:29092,SASL_PLAINTEXT://:29093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xx.xx:29092,SASL_PLAINTEXT://xx.xx:29093
      KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf
      KAFKA_LOG_CLEANUP_POLICY: "delete"
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_SUPER_USERS: User:admin
      KAFKA_SASL_ENABLED_MECHANISMS: PLAINTEXT.
      消费者就停止了
      这个异常信息
      Exception in thread "main" org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: 1

        • 应该是broker通信有问题
          Listeners are not identical across brokers: Map(2 -> Map(ListenerName(PLAINTEXT) -> xx:29092 (id: 2 rack: null), ListenerName(SASL_PLAINTEXT) -> xx:29093 (id: 2 rack: null)), 1 -> Map(ListenerName(PLAINTEXT) -> xx:19092 (id: 1 rack: null))) (kafka.server.MetadataCache)
          这个怎么解决