使用Kubernetes部署Kafka集群

半兽人 发表于: 2019-08-08   最后更新时间: 2019-08-11  

Apache Kafka是一种流行的分布式流式消息平台。Kafka生产者将数据写入分区主题,这些主题通过可配置的副本存储到broker群集上。 消费者来消费存储在broker的分区生成的数据。

注意:详细信息可以在这里找到。 可以在此处了解有关在Kubernetes上运行Kafka群集的更多信息。

首先,创建kafka_mini.yaml

apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: kafka-pdb
spec:
  selector:
    matchLabels:
      app: kafka
  maxUnavailable: 1
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-hs
  replicas: 3
  podManagementPolicy: Parallel
  updateStrategy:
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: kafka
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - kafka
              topologyKey: "kubernetes.io/hostname"
        podAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
             - weight: 1
               podAffinityTerm:
                 labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values:
                        - zk
                 topologyKey: "kubernetes.io/hostname"
      terminationGracePeriodSeconds: 300
      containers:
      - name: k8skafka
        imagePullPolicy: Always
        image: gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
        ports:
        - containerPort: 9093
          name: server
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9093 \
          --override zookeeper.connect=zk-cs.default.svc.cluster.local:2181 \
          --override log.dir=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=0.10.2-IV0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/kafka
        readinessProbe:
          exec:
           command:
            - sh
            - -c
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
      securityContext:
        runAsUser: 1000
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 10Gi

然后运行它

$ kubectl apply -f kafka\_mini.yaml

service "kafka-hs" created
poddisruptionbudget "kafka-pdb" created
statefulset "kafka" created

使用StatefulSet来创建一个kafka集群,kafka通过zk-cs服务连接ZooKeeper集群,如果你还没安装,则先安装zookeeper

这个Kafka集群是用于演示目的,它的设置大小不适合生产使用。

如果你观察Pod创建,您会注意到,Kafka集群使用了Parallel podManagementPolicy策略。

$ kubectl get po -lapp=kafka -w

NAME           READY         STATUS        RESTARTS     AGE
kafka-0     0/1             Pending      0                   0s
kafka-0     0/1             Pending      0                  0s
kafka-1     0/1             Pending      0                  0s
kafka-1     0/1             Pending      0                  0s
kafka-2     0/1             Pending      0                  0s
kafka-0     0/1             ContainerCreating     0                  0s
kafka-2     0/1             Pending      0                  0s
kafka-1     0/1             ContainerCreating     0                  0s
kafka-1     0/1             Running     0                  11s
kafka-0     0/1             Running     0                  19s
kafka-1     1/1             Running     0                  23s
kafka-0     1/1             Running     0                  32s

生产和消费数据

可以使用kubectl run来执行kafka-topics.sh脚本来创建名为test的主题。

$ kubectl run -ti --image=gcr.io/google\_containers/kubernetes-kafka:1.0-10.2.1 createtopic --restart=Never --rm -- kafka-topics.sh --create \

\> --topic test \

\> --zookeeper zk-cs.default.svc.cluster.local:2181 \

\> --partitions 1 \

\> --replication-factor 3

现在,使用kubectl run来执行kafka-console-consumer.sh来监听消息:

$ kubectl run -ti --image=gcr.io/google\_containers/kubnetes-kafka:1.0-10.2.1 consume --restart=Never --rm -- kafka-console-consumer.sh --topic test --bootstrap-server kafka-0.kafka-hs.default.svc.cluster.local:9093

在打开一个新的命令行窗口,运行生产者:

$kubectl run -ti --image=gcr.io/google\_containers/kubernetes-kafka:1.0-10.2.1 produce --restart=Never --rm \

\>   -- kafka-console-producer.sh --topic test --broker-list kafka-0.kafka-hs.default.svc.cluster.local:9093,kafka-1.kafka-hs.default.svc.cluster.local:9093,kafka-2.kafka-hs.default.svc.cluster.local:9093

Output from the second terminal appears in the first terminal. If you continue to produce and consume messages while updating the cluster, you will notice that no messages are lost. You may see error messages as the leader for the partition changes when individual brokers are updated, but the client retries until the message is committed. This is due to the ordered, sequential nature of StatefulSet rolling updates which we will explore further in the next section.

Updating the Kafka cluster

StatefulSet updates are like DaemonSet updates in that they are both configured by setting the spec.updateStrategy of the corresponding API object. When the update strategy is set to OnDelete, the respective controllers will only create new Pods when a Pod in the StatefulSet or DaemonSet has been deleted. When the update strategy is set to RollingUpdate, the controllers will delete and recreate Pods when a modification is made to the spec.template field of a DaemonSet or StatefulSet. You can use rolling updates to change the configuration (via environment variables or command line parameters), resource requests, resource limits, container images, labels, and/or annotations of the Pods in a StatefulSet or DaemonSet. Note that all updates are destructive, always requiring that each Pod in the DaemonSet or StatefulSet be destroyed and recreated. StatefulSet rolling updates differ from DaemonSet rolling updates in that Pod termination and creation is ordered and sequential.

You can patch the kafka StatefulSet to reduce the CPU resource request to 250m.

$ kubectl patch sts kafka --type='json' -p='[{"op": "replace", "path": "/spec/template/spec/containers/0/resources/requests/cpu", "value":"250m"}]'

statefulset "kafka" patched

If you watch the status of the Pods in the StatefulSet, you will see that each Pod is deleted and recreated in reverse ordinal order (starting with the Pod with the largest ordinal and progressing to the smallest). The controller waits for each updated Pod to be running and ready before updating the subsequent Pod.

$kubectl get po -lapp=kafka -w

NAME           READY         STATUS       RESTARTS     AGE
kafka-0     1/1             Running     0                   13m
kafka-1     1/1             Running     0                   13m
kafka-2     1/1             Running     0                   13m
kafka-2     1/1             Terminating     0                 14m
kafka-2     0/1             Terminating     0                 14m
kafka-2     0/1             Terminating     0                 14m
kafka-2     0/1             Terminating     0                 14m
kafka-2     0/1             Pending     0                 0s
kafka-2     0/1             Pending     0                 0s
kafka-2     0/1             ContainerCreating     0                 0s
kafka-2     0/1             Running     0                 10s
kafka-2     1/1             Running     0                 21s
kafka-1     1/1             Terminating     0                 14m
kafka-1     0/1             Terminating     0                 14m
kafka-1     0/1             Terminating     0                 14m
kafka-1     0/1             Terminating     0                 14m
kafka-1     0/1             Pending     0                 0s
kafka-1     0/1             Pending     0                 0s
kafka-1     0/1             ContainerCreating     0                 0s
kafka-1     0/1             Running     0                 11s
kafka-1     1/1             Running     0                 21s
kafka-0     1/1             Terminating     0                 14m
kafka-0     0/1             Terminating     0                 14m
kafka-0     0/1             Terminating     0                 14m
kafka-0     0/1             Terminating     0                 14m
kafka-0     0/1             Pending     0                 0s
kafka-0     0/1             Pending     0                 0s
kafka-0     0/1             ContainerCreating     0                 0s
kafka-0     0/1             Running     0                 10s
kafka-0     1/1             Running     0                 22s

Note that unplanned disruptions will not lead to unintentional updates during the update process. That is, the StatefulSet controller will always recreate the Pod at the correct version to ensure the ordering of the update is preserved. If a Pod is deleted, and if it has already been updated, it will be created from the updated version of the StatefulSet’s spec.template. If the Pod has not already been updated, it will be created from the previous version of the StatefulSet’s spec.template. We will explore this further in the following sections.

Staging an update

Depending on how your organization handles deployments and configuration modifications, you may want or need to stage updates to a StatefulSet prior to allowing the roll out to progress. You can accomplish this by setting a partition for the RollingUpdate. When the StatefulSet controller detects a partition in the updateStrategy of a StatefulSet, it will only apply the updated version of the StatefulSet’s spec.template to Pods whose ordinal is greater than or equal to the value of the partition.

You can patch the kafka StatefulSet to add a partition to the RollingUpdate update strategy. If you set the partition to a number greater than or equal to the StatefulSet’s spec.replicas (as below), any subsequent updates you perform to the StatefulSet’s spec.template will be staged for roll out, but the StatefulSet controller will not start a rolling update.

$ kubectl patch sts kafka -p '{"spec":{"updateStrategy":{"type":"RollingUpdate","rollingUpdate":{"partition":3}}}}'

statefulset "kafka" patched

If you patch the StatefulSet to set the requested CPU to 0.3, you will notice that none of the Pods are updated.

$ kubectl patch sts kafka --type='json' -p='[{"op": "replace", "path": "/spec/template/spec/containers/0/resources/requests/cpu", "value":"0.3"}]'

statefulset "kafka" patched

Even if you delete a Pod and wait for the StatefulSet controller to recreate it, you will notice that the Pod is recreated with current CPU request.

$   kubectl delete po kafka-1

pod "kafka-1" deleted


$ kubectl get po kafka-1 -w

NAME           READY         STATUS                           RESTARTS     AGE
kafka-1     0/1             ContainerCreating     0                   10s
kafka-1     0/1             Running     0                 19s
kafka-1     1/1             Running     0                 21s

    $ kubectl get po kafka-1 -o yaml

    apiVersion: v1
    kind: Pod
    metadata:
       ...
           resources:
               requests:
                   cpu: 250m
                   memory: 1Gi

Rolling out a canary

Often, we want to verify an image update or configuration change on a single instance of an application before rolling it out globally. If you modify the partition created above to be 2, the StatefulSet controller will roll out a canary that can be used to verify that the update is working as intended.

$ kubectl patch sts kafka -p '{"spec":{"updateStrategy":{"type":"RollingUpdate","rollingUpdate":{"partition":2}}}}'

statefulset "kafka" patched

You can watch the StatefulSet controller update the kafka-2 Pod and pause after the update is complete.

$ kubectl get po -lapp=kafka -w
NAME           READY         STATUS       RESTARTS     AGE
kafka-0     1/1             Running     0                   50m
kafka-1     1/1             Running     0                   10m
kafka-2     1/1             Running     0                   29s
kafka-2     1/1             Terminating     0                 34s
kafka-2     0/1             Terminating     0                 38s
kafka-2     0/1             Terminating     0                 39s
kafka-2     0/1             Terminating     0                 39s
kafka-2     0/1             Pending     0                 0s
kafka-2     0/1             Pending     0                 0s
kafka-2     0/1             Terminating     0                 20s
kafka-2     0/1             Terminating     0                 20s
kafka-2     0/1             Pending     0                 0s
kafka-2     0/1             Pending     0                 0s
kafka-2     0/1             ContainerCreating     0                 0s
kafka-2     0/1             Running     0                 19s
kafka-2     1/1             Running     0                 22s

Phased roll outs

Similar to rolling out a canary, you can roll out updates based on a phased progression (e.g. linear, geometric, or exponential roll outs).

If you patch the kafka StatefulSet to set the partition to 1, the StatefulSet controller updates one more broker.

$ kubectl patch sts kafka -p '{"spec":{"updateStrategy":{"type":"RollingUpdate","rollingUpdate":{"partition":1}}}}'

statefulset "kafka" patched

If you set it to 0, the StatefulSet controller updates the final broker and completes the update.

$ kubectl patch sts kafka -p '{"spec":{"updateStrategy":{"type":"RollingUpdate","rollingUpdate":{"partition":0}}}}'

statefulset "kafka" patched

Note that you don’t have to decrement the partition by one. For a larger StatefulSet–for example, one with 100 replicas–you might use a progression more like 100, 99, 90, 50, 0. In this case, you would stage your update, deploy a canary, roll out to 10 instances, update fifty percent of the Pods, and then complete the update.



您需要解锁本帖隐藏内容请: 点击这里
本帖隐藏的内容




半兽人 发表于: 1月前   最后更新时间: 1月前   游览量:148

上一条: k8s删除一个Node并重新加入集群
下一条: ResourceQuota资源配额 - Kubernetes(k8s)