赞
踩
在实际生产环境中,当请求激增时,kafka 生产者发送的消息数量会远远大于 kafka 消费者的消费能力,从而导致消息堆积和处理延迟。为了避免此种情况,就要求消费者能够感知到 kafka 消息堆积,并通过动态增加或减少自身的副本数,实现动态自适应消费,这就是本文即将介绍的内容,即基于 kafka_consumergroup_lag 指标实现 Consumer Pod 水平弹性伸缩。
Kubernetes 通过 HPA 实现 Pod 的水平弹性伸缩,默认支持多种类型,包括 Resource、Pods、Object、External、ContainerResource。有关 HPA 的更多官方介绍请参考官方文档:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/,本文不作冗余说明。
由于 kafka_consumergroup_lag 指标不是从待扩缩容的消费者 Pod 上采集上来的,没有与 K8s 资源对象建立关联关系,因此这里需要使用 External 类型的 HPA。
整体架构图如下:
如果已经有 Kubernetes 集群了,可以跳过该步骤
本文使用 Kind 创建一个测试集群, 准备如下配置文件,命名为 kind-cluster.yaml:
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
image: kindest/node:v1.28.0@sha256:b7a4cad12c197af3ba43202d3efe03246b3f0793f162afb40a33c923952d5b31
- role: worker
image: kindest/node:v1.28.0@sha256:b7a4cad12c197af3ba43202d3efe03246b3f0793f162afb40a33c923952d5b31
- role: worker
image: kindest/node:v1.28.0@sha256:b7a4cad12c197af3ba43202d3efe03246b3f0793f162afb40a33c923952d5b31
执行如下命令,创建 Kind 集群:
kind create cluster --config kind-cluster.yaml
这里使用 Helm 一键安装:
helm repo add kafka-repo https://helm-charts.itboon.top/kafka
helm repo update kafka-repo
helm upgrade --install kafka \
--namespace kafka \
--create-namespace \
--set broker.combinedMode.enabled="true" \
--set broker.persistence.enabled="false" \
kafka-repo/kafka
注意,此种安装关闭了持久化存储,单实例最小化运行,仅用于测试环境。
git clone https://github.com/prometheus-operator/kube-prometheus.git
cd kube-prometheus
kubectl create -f manifests/setup
kubectl create -f manifests/
通过端口转发,访问 Prometheus 看板:
kubectl port-forward service/prometheus-k8s 9090:9090 -n monitoring
执行上述端口转发命令后,浏览器访问 http://localhost:9090
准备如下 kafka-exporter.yaml 文件:
--- apiVersion: apps/v1 kind: Deployment metadata: name: kafka-exporter namespace: monitoring labels: app: kafka-exporter spec: replicas: 1 selector: matchLabels: app: kafka-exporter template: metadata: labels: app: kafka-exporter spec: containers: - name: kafka-exporter image: danielqsj/kafka-exporter:v1.6.0 imagePullPolicy: IfNotPresent args: ["--kafka.server=kafka-headless.kafka:9092"] ports: - containerPort: 9308 name: http --- apiVersion: v1 kind: Service metadata: labels: app: kafka-exporter name: kafka-exporter namespace: monitoring spec: type: ClusterIP ports: - name: http port: 9308 protocol: TCP targetPort: 9308 selector: app: kafka-exporter
执行如下命令安装:
kubectl apply -f kafka-exporter.yaml
本文通过创建 ServiceMonitor 实现,准备如下 kafka-service-monitor.yaml 文件:
--- apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: labels: app: kafka-exporter name: prometheus-kafka-exporter namespace: monitoring spec: endpoints: - honorLabels: true interval: 1m path: /metrics port: http scheme: http params: target: - 'kafka-headless.kafka:9092' relabelings: - sourceLabels: [__param_target] targetLabel: instance namespaceSelector: matchNames: - monitoring selector: matchLabels: app: kafka-exporter
执行如下命令,创建 ServiceMonitor:
kubectl apply -f kafka-service-monitor.yaml
通过 kubectl exec 进入 kafka 容器,执行 bin/kafka-topics.sh 命令创建 Topic:
kubectl exec -it -n kafka kafka-broker-0 bash
bin/kafka-topics.sh --bootstrap-server kafka-headless.kafka:9092 \
--create --topic custom-topic \
--replication-factor 1 \
--partitions 3
准备如下 kafka-consumer.yaml 文件:
apiVersion: apps/v1 kind: Deployment metadata: name: consumer-kafka-go-client spec: replicas: 1 selector: matchLabels: lang: golang kafka: consumer template: metadata: labels: lang: golang kafka: consumer spec: containers: - name: consumer-kafka-go-client image: shidaqiu/kafka-client:1.1 command: - ./consumer - kafka-headless.kafka:9092 - custom-topic - golang-consumer - "100" # WaitMs - "2000" - plaintext resources: limits: cpu: 50m memory: 300Mi
执行如下命令创建:
kubectl apply -f kafka-consumer.yaml
准备如下 kafka-producer.yaml 文件:
apiVersion: apps/v1 kind: Deployment metadata: name: producer-kafka-go-client spec: replicas: 1 selector: matchLabels: lang: golang kafka: producer template: metadata: labels: lang: golang kafka: producer spec: containers: - name: producer-kafka-go-client image: shidaqiu/kafka-client:1.1 command: - ./producer - kafka-headless.kafka:9092 - custom-topic - "100" - "10000" - none - "1000" # WaitMs - "2000" - plaintext
执行如下命令创建:
kubectl apply -f kafka-producer.yaml
登陆 Prometheus 看板,检查 kafka_consumergroup_lag 指标被成功采集
首先,下载 helm chart 包到本地
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update
helm pull --untar prometheus-community/prometheus-adapter
然后,编辑其 values.yaml 文件,修改参数,包括设置 Prometheus URL 地址,以及增加 kafka_consumergroup_lag 的指标转换规则,示例如下:
prometheus:
# Value is templated
url: http://prometheus-k8s.monitoring
port: 9090
path: ""
rules:
external:
- seriesQuery: '{topic!="", __name__=~"kafka_consumergroup_lag"}'
resources:
template: <<.Resource>>
name:
as: "hpa_kafka_consumergroup_lag"
metricsQuery: sum(min_over_time(kafka_consumergroup_lag{<< range $key, $value :=.LabelValuesByName >><< if ne $key "namespace" >><< $key >>="<< $value >>",<<end >><< end >>}[1h])) by (topic,consumergroup)
最后,使用新配置,部署 Prometheus Adaptor:
helm upgrade --install prometheus-adaptor . -n monitoring
部署完成后,可以通过如下命令验证指标采集和转换无误:
kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/monitoring/hpa_kafka_consumergroup_lag" | jq .
输入类似如下内容,则表示工作正常:
{ "kind": "ExternalMetricValueList", "apiVersion": "external.metrics.k8s.io/v1beta1", "metadata": {}, "items": [ { "metricName": "hpa_kafka_consumergroup_lag", "metricLabels": { "consumergroup": "golang-consumer", "topic": "custom-topic" }, "timestamp": "2024-03-02T14:26:15Z", "value": "0" } ] }
准备 consumer-hpa.yaml 文件:
apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: consumer-kafka-go-client-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: consumer-kafka-go-client minReplicas: 1 maxReplicas: 3 metrics: - type: External external: metric: name: hpa_kafka_consumergroup_lag selector: matchLabels: topic: custom-topic target: type: Value value: "300"
执行如下命令,应用 HPA:
kubectl apply -f consumer-hpa.yaml
通过如下命令扩容 producer 实例数量
kubectl scale deploy/producer-kafka-go-client --replicas 2
可以观察到当 hpa_kafka_consumergroup_lag 超过 300 时,能够触发消费者实例扩容:
本文相关源码:https://github.com/SataQiu/kafka_metrics_hpa
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。