kafka使用SSL加密和认证

原创 kubernetes k8s
半兽人 发表于: 2016-05-03   最后更新时间: 2021-09-28 15:58:36  
{{totalSubscript}} 订阅, 60,655 游览

7.2 使用SSL加密和认证

Apache kafka 允许clinet通过SSL连接,SSL默认是不可用的,需手动开启。

1. 为每个Kafka broker生成SSL密钥和证书。

部署HTTPS,第一步是为集群的每台机器生成密钥和证书,可以使用java的keytool来生产。我们将生成密钥到一个临时的密钥库,之后我们可以导出并用CA签名它。

keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey

你需要用上面命令来指定两个参数。

  1. keystore: 密钥仓库存储证书文件。密钥仓库文件包含证书的私钥(保证私钥的安全)。
  2. validity: 证书的有效时间,天

配置主机名验证

从Kafka 2.0.0版开始,默认情况下客户端连接以及broker之间的连接启用了服务器的主机名验证,为防止中间人攻击。 可以通过将ssl.endpoint.identification.algorithm设置为空字符串来关闭服务器主机名验证。 例如,

ssl.endpoint.identification.algorithm=

对于动态配置的broker侦听,可以使用kafka-configs.sh禁用主机名验证。 如,

bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

对于老版本的Kafka,默认情况下未定义ssl.endpoint.identification.algorithm,因此不会执行主机名验证。 通过设置为HTTPS以启用主机名验证。

ssl.endpoint.identification.algorithm=HTTPS

如果服务器端没有进行外部验证,则必须启用主机名验证,以防止中间人攻击。

在证书中配置主机名

如果启用了主机名验证,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):

  1. Common Name (CN)
  2. Subject Alternative Name (SAN)

这两个字段均有效,但是RFC-2818建议使用SAN。SAN更加的灵活,允许声明多个DNS条目。另一个优点是,出于授权目的,可以将CN设置为更有意义的值。 要添加SAN,需将以下参数-ext SAN=DNS:{FQDN}追加到keytool命令:

keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}

然后可以运行以下命令来验证生成的证书的内容:

keytool -list -v -keystore server.keystore.jks

2. 创建自己的CA

通过第一步,集群中的每台机器都生成一对公私钥,和一个证书来识别机器。但是,证书是未签名的,这意味着攻击者可以创建一个这样的证书来伪装成任何机器。

因此,通过对集群中的每台机器进行签名来防止伪造的证书。证书颁发机构(CA)负责签名证书。CA的工作机制像一个颁发护照的政府。政府印章(标志)每本护照,这样护照很难伪造。其他政府核实护照的印章,以确保护照是真实的。同样,CA签名的证书和加密保证签名证书很难伪造。因此,只要CA是一个真正和值得信赖的权威,client就能有较高的保障连接的是真正的机器。

openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

生成的CA是一个简单的公私钥对证书,用于签名其他的证书。

下一步是将生成的CA添加到**clients' truststore(客户的信任库)**,以便client可以信任这个CA:

keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

注意,还需设置ssl.client.auth(”requested" 或 "required”),来要求broker对客户端连接进行验证,当然,你必须为broker提供信任库以及所有客户端签名了密钥的CA证书,通过下面的命令:

keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert

相反,在步骤1中,密钥库存储了每个机器自己的身份。客户端的信任库存储所有客户端信任的证书,将证书导入到一个信任仓库也意味着信任由该证书签名的所有证书,正如上面的比喻,信任政府(CA)也意味着信任它颁发的所有护照(证书),此特性称为信任链,在大型的kafka集群上部署SSL时特别有用的。可以用单个CA签名集群中的所有证书,并且所有的机器共享相同的信任仓库,这样所有的机器也可以验证其他的机器了。

3. 签名证书

步骤2生成的CA来签名所有步骤1生成的证书,首先,你需要从密钥仓库导出证书:

keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file

然后用CA签名:

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}

最后,你需要导入CA的证书和已签名的证书到密钥仓库:

keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

参数的定义如下:

  1. keystore: 密钥仓库的位置
  2. ca-cert: CA的证书
  3. ca-key: CA的私钥
  4. ca-password: CA的密码
  5. cert-file: 出口,服务器的未签名证书
  6. cert-signed: 已签名的服务器证书

这是上面所有步骤的一个bash脚本例子。注意,这里假设密码“test1234”。

#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

4. 配置Kafka Broker

Kafka Broker支持监听多个端口上的连接,通过server.properties 配置,最少监听1个端口,用逗号分隔。

listeners

如果broker之间通讯未启用SSL(参照下面,启动它),PLAINTEXT和SSL端口是必须要配置。

listeners=PLAINTEXT://host.name:port,SSL://host.name:port

下面是broker端需要的SSL配置,

ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=test1234

注意:ssl.truststore.password在是可选的,但强烈建议使用。如果未设置密码,则仍然可以访问信任库,但就不是完整性的检查了。

其他还有一些的可选的配置:

  1. ssl.client.auth = none (“required”=>客户端身份验证是必需的,“requested”=>客户端身份验证请求,客户端没有证书仍然可以连接。使用“requested”是纸老虎,因为它提供了一种虚假的安全感,错误的配置客户端仍将连接成功。)
  2. ssl.cipher.suites(可选)。密码套件是利用TLS或SSL网络协议的网络连接的安全设置。是认证,加密,MAC和密钥交换算法的组合。 (默认值是一个空表)
  3. ssl.enabled.protocols = TLSv1.2 TLSv1.1 TLSv1 (接收来自客户端列出的SSL协议,注意,不推荐在生产中使用SSL,推荐使用TLS)。
  4. ssl.keystore.type=JKS
  5. ssl.truststore.type=JKS
  6. ssl.secure.random.implementation=SHA1PRNG

如果你想启用SSL用于broker内部通讯,将以下内容添加到broker配置文件(默认是PLAINTEXT)

security.inter.broker.protocol=SSL

由于一些国家的进口规定,oracle的实现限制了默认的加密算法的强度。如果需要更强的算法(例如AES 256位密钥),该JCE Unlimited Strength Jurisdiction Policy Files必须获得并安装JDK和JRE。更多信息参见JCA文档

JRE/JDK有一个默认伪随机数生成者(PRNG),用于加密操作。因此,不需要用

ssl.secure.random.implementation

实现。 然而,有些实现存在性能问题(尤其是在Linux系统选择的默认值

NativePRNG

,使用全局锁)。 在SSL连接的性能出现问题的情况下,请考虑明确设置要使用的实现。 如

SHA1PRNG

实现是非阻塞的,并且在高负载(50MB/秒的生成消息,以及每个代理的复制流量)下显示出非常好的性能特征。

一旦你启动broker,你应该就能在server.log看到

with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)

用以下命令,快速验证服务器的keystore和truststore设置是否正确:

openssl s_client -debug -connect localhost:9093 -tls1

(注意: TLSv1 应列出 ssl.enabled.protocols)
在命令的输出中,你应该能看到服务器的证书:

  -----BEGIN CERTIFICATE-----
        {variable sized random bytes}
        -----END CERTIFICATE-----
        subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
        issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com

如果证书没有出现或者有任何其他错误信息,那么你的keystore设置不正确。

5. 配置Kafka客户端

SSL仅支持新的Producer和Consumer,不支持老的API,Producer和Consumer的SSL的配置是相同的。

如果broker中不需要client(客户端)验证,那么下面是最小的配置示例:

security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234

注意:ssl.truststore.password在技术上是可选的,但强烈推荐。 如果未设置密码,对信任库的访问仍然可用,就不属于完整性检查。 如果需要客户端认证,则必须像步骤1一样创建密钥库,并且还必须配置以下内容:

ssl.keystore.location=/var/private/ssl/client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

也可以根据我的需求在broker上设置其他的配置:

  1. ssl.provider (可选的). 用于SSL连接的安全提供程序名称,默认值是JVM的默认的安全提供程序。
  2. ssl.cipher.suites (可选).密码套件是身份验证、 加密、 MAC 和密钥交换算法用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置的已命名的组合。
  3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1。broker配置协议(至少一个)。
  4. ssl.truststore.type=JKS
  5. ssl.keystore.type=JKS

举个console-produceconsole-consumer的例子:

kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties

实战笔记

kafka实战ssl



上一条: kafka安全概述
下一条: kafka认证和acl

加菲猫 14天前

检查SSL配置是否有效的命令 过时了
openssl s_client -debug -connect localhost:9093 -tls1
现在的版本是:TLSv1.2,命令是:
openssl s_client -debug -connect localhost:9093 -tls1_2
通过 -help能查看具体操作:
openssl s_client -help

半兽人 -> 加菲猫 14天前

朋友,我喜欢你这样的.

加菲猫 -> 半兽人 14天前

我正好有点问题 想请教一下,我搭了一个kafka集群,使用了SSL,kafka tool能连上,但只显示一个kafka broker,应该是集群没组好,但没找到原因。

broker1/docker-compose.yml

version: "3.8"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    # restart: always
    ports:
      - ${ZK_PORT_1}:2181
      - ${ZK_PORT_1_2}:2888
      - ${ZK_PORT_1_3}:3888
    volumes:
      - ./zookeeper/data:/opt/zookeeper/data
      - ./zookeeper/logs:/opt/zookeeper/logs
    environment:
      TZ: Asia/Shanghai
      CLIENT_PORT: 2181
      TICK_TIME: 2000
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=${SERVER_2}:${ZK_PORT_2_2}:${ZK_PORT_2_3}

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    # restart: always
    ports:
      - ${KAFKA_PORT_1}:${KAFKA_PORT_1}
    volumes:
      # - ./kafka/docker.sock:/var/run/docker.sock
      - ./kafka/logs:/kafka
      - ../.env/certs:/certs
    depends_on:
      - zookeeper
    environment:
      TZ: Asia/Shanghai
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENERS: SSL://kafka:${KAFKA_PORT_1}
      KAFKA_ADVERTISED_LISTENERS: SSL://${SERVER_1}:${KAFKA_PORT_1}
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181,${SERVER_2}:${ZK_PORT_2}
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
      # 分区数
      KAFKA_NUM_PARTITIONS: 1
      # 副本数
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_NUM_REPLICA_FETCHERS: 5
      KAFKA_SSL_CLIENT_AUTH: required
      KAFKA_SSL_SECURE_RANDOM_IMPLEMENTATION: SHA1PRNG
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: " "
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
      KAFKA_SSL_KEYSTORE_LOCATION: /certs/kafka.my.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: ${PASSWORD}
      KAFKA_SSL_KEY_PASSWORD: ${PASSWORD}
      KAFKA_SSL_TRUSTSTORE_LOCATION: /certs/kafka.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: ${PASSWORD}

broker2/docker-compose.yml

version: "3.8"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper_2
    # restart: always
    ports:
      - ${ZK_PORT_2}:2181
      - ${ZK_PORT_2_2}:2888
      - ${ZK_PORT_2_3}:3888
    volumes:
      - ./zookeeper/data:/opt/zookeeper/data
      - ./zookeeper/logs:/opt/zookeeper/logs
      # - ../.env/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
    environment:
      TZ: Asia/Shanghai
      CLIENT_PORT: 2181
      TICK_TIME: 2000
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=${SERVER_1}:${ZK_PORT_1_2}:${ZK_PORT_1_3} server.2=0.0.0.0:2888:3888

      # KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf
      #   -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
      #   -Dzookeeper.allowSaslFailedClients=false
      #   -Dzookeeper.requireClientAuthScheme=sasl

  kafka:
    image: wurstmeister/kafka
    container_name: kafka_2
    # restart: always
    ports:
      - ${KAFKA_PORT_2}:${KAFKA_PORT_2}
      # - 29092:29092
    volumes:
      # - ./kafka/docker.sock:/var/run/docker.sock
      - ./kafka/logs:/kafka
      - ../.env/certs:/certs
      # - ../.env/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
    depends_on:
      - zookeeper
    environment:
      TZ: Asia/Shanghai
      KAFKA_BROKER_ID: 2
      KAFKA_LISTENERS: SSL://kafka:${KAFKA_PORT_2}
      KAFKA_ADVERTISED_LISTENERS: SSL://${SERVER_2}:${KAFKA_PORT_2}
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181,${SERVER_1}:${ZK_PORT_1}
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
      # 分区数
      KAFKA_NUM_PARTITIONS: 1
      # 副本数
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_NUM_REPLICA_FETCHERS: 5
      KAFKA_SSL_CLIENT_AUTH: required
      KAFKA_SSL_SECURE_RANDOM_IMPLEMENTATION: SHA1PRNG
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: " "
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
      KAFKA_SSL_KEYSTORE_LOCATION: /certs/kafka.my.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: ${PASSWORD}
      KAFKA_SSL_KEY_PASSWORD: ${PASSWORD}
      KAFKA_SSL_TRUSTSTORE_LOCATION: /certs/kafka.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: ${PASSWORD}
半兽人 -> 加菲猫 13天前

这没点错误提示很难定位问题的呀,你如果找到相关的错误信息,到问题专区直接提问吧。
这里施展不开。

1年前

还有我启用了主机名验证的话,未来kafka扩容咋办?需要把新的主机名加入到证书验证里面?那就还得替换集群其他正在使用的证书啊

1年前

请问主机名验证里面,alias和SAN DNS设置是都写hostname吗?

简单爱 1年前

请问一下,如果我客户端使用的是java SDK,应该如何操作呢?

luyun 2年前
SL routines:ssl3_get_record:wrong version number:../ssl/record/ssl3_record.c:332:

no peer certificate available
---
No client certificate CA names sent
---
SSL handshake has read 5 bytes and written 122 bytes

你好执行openssl s_client -debug -connect localhost:9093 -tls1时候报错如上,请问什么原因呢

萝卜权。 2年前

请问下,如果kafka的keytab文件在普通用户在生成,然而启动kafka集群是在root用户下,这样能带着普通用户下的keytab访问root用户下启动的kafka集群么,我现在发生的情况存在这个疑问,特意来需求下大佬的意见。

解决了。。

半兽人 -> 萝卜权。 2年前

就知道你可以。

3年前

你好,按照文档配置完成 ,producer启动后 在控制台输入内容 提示java.security.cert.CertificateException: No name matching localhost found
请教怎么处理?

半兽人 -> 3年前

在创建密钥库是没有设置进去,必须是你访问的域名,如果是本地测试,可以用localhost

zzz -> 半兽人 3年前

如果是放在服务器上面,我的producer和consumer要通过ip来接发数据 还是正常的ip加port吗

半兽人 -> zzz 3年前

没差,创建密钥库的时候注意设置上。

zzz -> 半兽人 3年前

是在创建时 提示输入 姓名姓式这个位置设置成服务器的ip吗

半兽人 -> zzz 3年前

你搜索下这篇文章的 localhost 

-> zzz 2年前

你调试好了吗,我也是安装了好几遍没有成功

3年前

关于SSL这部分,各个brokers节点创建的公钥(cer文件)要导入到同个truststore中,然后发布到各个brokers,即各个broker用同一个truststore,因为各个brokers之间互为client-server关于,不在truststore中保存的key是非安全不能通过验证。

-> 2年前

大神,有没有更具体的操作步骤?

你好,请问下,客户端配置支持ssl认证的部分,location对应的配置文件,路径可以直接写成服务器上面的嘛(就是server.properties里配置的ssl location路径),还是需要我自己在客户端建一个对应的目录然后引入这个目录下的相应文件?

这个还为关注。

格律诗 4年前

Hi, 半兽人。我看了你对kafka官方文档的翻译之后,我想咨询您一下:您是否在非kerberos模式下配置kafka acl,您是怎么处理这种场景的?

格律诗 -> 格律诗 4年前

因为我这边配置SSL后,配置ACL,在打开终端消费的时候,会报一下错误:
执行的命令是:
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper bigdata001:2181 --topic test-topic
错误日志:
kafka.common.BrokerEndPointNotAvailableException: End point with security protocol PLAINTEXT not found for broker 1001

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章
提问