当前位置:   article > 正文

Strimzi从入门到精通系列之三:部署Kafka Connect

strimzi

一、概述

Kafka Connect 是一个用于在 Apache Kafka 和其他系统之间传输数据的工具。例如,Kafka Connect 可能会将 Kafka 与外部数据库或存储和消息传递系统集成

在Strimzi中,Kafka Connect以分布式方式部署。 Kafka Connect 也可以在独立模式下工作,但 Strimzi 不支持。

使用连接器的概念,Kafka Connect 提供了一个框架,用于将大量数据移入和移出 Kafka 集群,同时保持可扩展性和可靠性。

Cluster Operator 管理使用 KafkaConnect 资源部署的 Kafka Connect 集群以及使用 KafkaConnector 资源创建的连接器。

为了使用 Kafka Connect,您需要执行以下操作。

  • 部署 Kafka Connect 集群
  • 添加连接器以与其他系统集成

二、将 Kafka Connect 部署到 Kubernetes 集群

此过程演示如何使用 Cluster Operator 将 Kafka Connect 集群部署到 Kubernetes 集群。

Kafka Connect 集群部署是通过可配置数量的节点(也称为工作线程)实现的,这些节点将连接器的工作负载作为任务进行分配,从而使消息流具有高度可扩展性和可靠性。

该部署使用 YAML 文件来提供创建 KafkaConnect 资源的规范。

Strimzi 提供了示例配置文件。在此过程中,我们使用以下示例文件:

  • examples/connect/kafka-connect.yaml

先决条件

  • 必须部署 Cluster Operator。
  • 运行 Kafka 集群。

程序

1.将 Kafka Connect 部署到您的 Kubernetes 集群。使用examples/connect/kafka-connect.yaml文件部署Kafka Connect。

kubectl apply -f examples/connect/kafka-connect.yaml
  • 1

2.检查部署状态:

kubectl get pods -n <my_cluster_operator_namespace>
  • 1

输出显示部署名称和准备情况

NAME                                 READY  STATUS   RESTARTS
my-connect-cluster-connect-<pod_id>  1/1    Running  0
  • 1
  • 2

my-connect-cluster 是 Kafka Connect 集群的名称。

Pod ID 标识创建的每个 Pod。

使用默认部署,您可以创建单个 Kafka Connect Pod。

READY 显示已准备好/预期的副本数量。当状态显示为正在运行时,部署成功。

三、Kafka Connect配置

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect # (1)
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true" # (2)
spec:
  replicas: 3 # (3)
  authentication: # (4)
    type: tls
    certificateAndKey:
      certificate: source.crt
      key: source.key
      secretName: my-user-source
  bootstrapServers: my-cluster-kafka-bootstrap:9092 # (5)
  tls: # (6)
    trustedCertificates:
      - secretName: my-cluster-cluster-cert
        certificate: ca.crt
      - secretName: my-cluster-cluster-cert
        certificate: ca2.crt
  config: # (7)
    group.id: my-connect-cluster
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  build: # (8)
    output: # (9)
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: # (10)
      - name: debezium-postgres-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.1.3.Final/debezium-connector-postgres-2.1.3.Final-plugin.tar.gz
            sha512sum: c4ddc97846de561755dc0b021a62aba656098829c70eb3ade3b817ce06d852ca12ae50c0281cc791a5a131cb7fc21fb15f4b8ee76c6cae5dd07f9c11cb7c6e79
      - name: camel-telegram
        artifacts:
          - type: tgz
            url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.11.5/camel-telegram-kafka-connector-0.11.5-package.tar.gz
            sha512sum: d6d9f45e0d1dbfcc9f6d1c7ca2046168c764389c78bc4b867dab32d24f710bb74ccf2a007d7d7a8af2dfca09d9a52ccbc2831fc715c195a3634cca055185bd91
  externalConfiguration: # (11)
    env:
      - name: AWS_ACCESS_KEY_ID
        valueFrom:
          secretKeyRef:
            name: aws-creds
            key: awsAccessKey
      - name: AWS_SECRET_ACCESS_KEY
        valueFrom:
          secretKeyRef:
            name: aws-creds
            key: awsSecretAccessKey
  resources: # (12)
    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  logging: # (13)
    type: inline
    loggers:
      log4j.rootLogger: "INFO"
  readinessProbe: # (14)
    initialDelaySeconds: 15
    timeoutSeconds: 5
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  metricsConfig: # (15)
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        name: my-config-map
        key: my-key
  jvmOptions: # (16)
    "-Xmx": "1g"
    "-Xms": "1g"
  image: my-org/my-image:latest # (17)
  rack:
    topologyKey: topology.kubernetes.io/zone # (18)
  template: # (19)
    pod:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: application
                    operator: In
                    values:
                      - postgresql
                      - mongodb
              topologyKey: "kubernetes.io/hostname"
    connectContainer: # (20)
      env:
        - name: OTEL_SERVICE_NAME
          value: my-otel-service
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: "http://otlp-host:4317"
  tracing:
    type: opentelemetry # (21)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  1. 使用 KafkaConnect。
  2. 为 Kafka Connect 集群启用 KafkaConnectors。
  3. 运行任务的工作人员的副本节点数量。
  4. Kafka Connect 集群的身份验证,指定为 mTLS、基于令牌的 OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512 或 PLAIN。默认情况下,Kafka Connect 使用plain text连接到 Kafka 代理。
  5. 用于连接 Kafka Connect 集群的引导服务器。
  6. 使用密钥名称进行 TLS 加密,在该密钥名称下,集群的 TLS 证书以 X.509 格式存储。如果证书存储在同一个密钥中,则可以多次列出它。
  7. Kafka Connect 工作线程(不是连接器)的配置。可以提供标准 Apache Kafka 配置,但仅限于那些不由 Strimzi 直接管理的属性。
  8. 构建配置属性,用于自动使用连接器插件构建容器映像。
  9. (必需)配置推送新镜像的容器注册表。
  10. (必需)要添加到新容器映像的连接器插件及其工件的列表。每个插件必须配置至少一个工件。
  11. 使用环境变量(如此处所示)或卷的连接器的外部配置。您还可以使用配置提供程序插件从外部源加载配置值。
  12. 请求预留受支持的资源(当前为 cpu 和内存),并限制指定可以消耗的最大资源。
  13. 通过 ConfigMap 直接(内联)或间接(外部)添加指定的 Kafka Connect 记录器和日志级别。自定义 ConfigMap 必须放置在 log4j.properties 或 log4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 记录器,您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。
  14. 运行状况检查以了解何时重新启动容器(活动性)以及容器何时可以接受流量(就绪性)。
  15. Prometheus 指标,在本示例中通过引用包含 Prometheus JMX 导出器配置的 ConfigMap 来启用。您可以使用对包含metricsConfig.valueFrom.configMapKeyRef.key 下的空文件的ConfigMap 的引用来启用指标,而无需进一步配置。
  16. 用于优化运行 Kafka Connect 的虚拟机 (VM) 性能的 JVM 配置选项。
  17. 高级选项:容器镜像配置,仅在特殊情况下推荐使用。
  18. 专业选项:部署的机架感知配置。这是一个专门的选项,适用于同一位置内的部署,而不是跨区域的部署。如果您希望连接器从最近的副本而不是领导者副本使用,请使用此选项。在某些情况下,从最近的副本进行消费可以提高网络利用率或降低成本。 topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准topology.kubernetes.io/zone 标签指定区域。要从最近的副本进行消费,请在 Kafka 代理配置中启用 RackAwareReplicaSelector。
  19. 模板定制。这里的 pod 被调度为反亲和性,因此 pod 不会被调度到具有相同主机名的节点上。
  20. 设置环境变量用于分布式跟踪。
  21. 使用 OpenTelemetry 启用分布式跟踪。

四、为多个实例配置 Kafka Connect

如果您正在运行 Kafka Connect 的多个实例,则必须更改以下配置属性的默认配置:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect
spec:
  # ...
  config:
    group.id: connect-cluster (1)
    offset.storage.topic: connect-cluster-offsets (2)
    config.storage.topic: connect-cluster-configs (3)
    status.storage.topic: connect-cluster-status  (4)
    # ...
# ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • Kafka 中的 Kafka Connect 集群 ID。
  • 存储连接器偏移量的 Kafka 主题。
  • 存储连接器和任务状态配置的 Kafka 主题。
  • 存储连接器和任务状态更新的 Kafka 主题。

对于具有相同 group.id 的所有 Kafka Connect 实例,三个主题的值必须相同。

除非您更改默认设置,否则连接到同一 Kafka 集群的每个 Kafka Connect 实例都会使用相同的值进行部署。实际上,所有实例都耦合在集群中运行并使用相同的主题。

如果多个 Kafka Connect 集群尝试使用相同的主题,Kafka Connect 将无法按预期工作并生成错误。

如果您希望运行多个 Kafka Connect 实例,请更改每个实例的这些属性的值。

五、添加连接器

Kafka Connect 使用连接器与其他系统集成以传输数据。连接器是 Kafka 连接器类的实例,可以是以下类型之一:

source连接器:

  • 源连接器是一个运行时实体,它从外部系统获取数据并将其作为消息提供给 Kafka。

sink连接器:

  • 接收器连接器是一个运行时实体,它从 Kafka 主题获取消息并将其提供给外部系统。

Kafka Connect 使用插件架构为连接器提供实现工件。插件允许连接到其他系统并提供额外的配置来操作数据。插件包括连接器和其他组件,例如数据转换器和转换。连接器与特定类型的外部系统一起运行。每个连接器都为其配置定义一个架构。您向 Kafka Connect 提供配置以在 Kafka Connect 中创建连接器实例。然后,连接器实例定义一组用于在系统之间移动数据的任务。

通过以下方式之一将连接器插件添加到 Kafka Connect:

  • 配置 Kafka Connect 自动构建带有插件的新容器镜像
  • 从基础 Kafka Connect 映像创建 Docker 映像(手动或使用持续集成)

将插件添加到容器镜像后,您可以通过以下方式启动、停止和管理连接器实例:

  • 使用 Strimzi 的 KafkaConnector 自定义资源
  • 使用 Kafka Connect API

您还可以使用这些选项创建新的连接器实例。

六、自动使用连接器插件构建新的容器映像

配置 Kafka Connect,以便 Strimzi 自动构建带有附加连接器的新容器映像。您可以使用 KafkaConnect 自定义资源的 .spec.build.plugins 属性定义连接器插件。 Strimzi 将自动下载连接器插件并将其添加到新的容器映像中。容器被推送到 .spec.build.output 中指定的容器存储库中,并自动在 Kafka Connect 部署中使用。

先决条件:

  • 必须部署 Cluster Operator。
  • 容器注册表。

您需要提供自己的容器注册表,可以在其中推送、存储和提取图像。 Strimzi 支持私有容器注册表以及公共注册表,例如 Quay 或 Docker Hub。

程序

1.通过在 .spec.build.output 中指定容器注册表和在 .spec.build.plugins 中指定其他连接器来配置 KafkaConnect 自定义资源:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec: # (1)
  #...
  build:
    output: # (2)
      type: docker
      image: my-registry.io/my-org/my-connect-cluster:latest
      pushSecret: my-registry-credentials
    plugins: # (3)
      - name: debezium-postgres-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.1.3.Final/debezium-connector-postgres-2.1.3.Final-plugin.tar.gz
            sha512sum: c4ddc97846de561755dc0b021a62aba656098829c70eb3ade3b817ce06d852ca12ae50c0281cc791a5a131cb7fc21fb15f4b8ee76c6cae5dd07f9c11cb7c6e79
      - name: camel-telegram
        artifacts:
          - type: tgz
            url: https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-telegram-kafka-connector/0.11.5/camel-telegram-kafka-connector-0.11.5-package.tar.gz
            sha512sum: d6d9f45e0d1dbfcc9f6d1c7ca2046168c764389c78bc4b867dab32d24f710bb74ccf2a007d7d7a8af2dfca09d9a52ccbc2831fc715c195a3634cca055185bd91
  #...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • Kafka Connect 集群的规范。
  • (必需)配置推送新镜像的容器注册表。
  • (必需)要添加到新容器映像的连接器插件及其工件的列表。每个插件必须配置至少一个工件。

2.创建或更新资源:

 kubectl apply -f <kafka_connect_configuration_file>
  • 1

3.等待新的容器映像构建,并部署 Kafka Connect 集群。

4.使用 Kafka Connect REST API 或 KafkaConnector 自定义资源来使用您添加的连接器插件。

七、使用 Kafka Connect 基础镜像中的连接器插件构建新的容器镜像

使用 Kafka Connect 基础映像中的连接器插件创建自定义 Docker 映像 将自定义映像添加到 /opt/kafka/plugins 目录。

您可以使用 Container Registry 上的 Kafka 容器映像作为基础映像,通过其他连接器插件创建您自己的自定义映像。

启动时,Strimzi 版本的 Kafka Connect 会加载 /opt/kafka/plugins 目录中包含的任何第三方连接器插件。

先决条件

  • 必须部署 Cluster Operator。

程序

  • 使用 quay.io/strimzi/kafka:0.35.1-kafka-3.4.0 作为基础镜像创建一个新的 Dockerfile:
FROM quay.io/strimzi/kafka:0.35.1-kafka-3.4.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
  • 1
  • 2
  • 3
  • 4

示例插件文件

$ tree ./my-plugins/
./my-plugins/
├── debezium-connector-mongodb
│   ├── bson-<version>.jar
│   ├── CHANGELOG.md
│   ├── CONTRIBUTE.md
│   ├── COPYRIGHT.txt
│   ├── debezium-connector-mongodb-<version>.jar
│   ├── debezium-core-<version>.jar
│   ├── LICENSE.txt
│   ├── mongodb-driver-core-<version>.jar
│   ├── README.md
│   └── # ...
├── debezium-connector-mysql
│   ├── CHANGELOG.md
│   ├── CONTRIBUTE.md
│   ├── COPYRIGHT.txt
│   ├── debezium-connector-mysql-<version>.jar
│   ├── debezium-core-<version>.jar
│   ├── LICENSE.txt
│   ├── mysql-binlog-connector-java-<version>.jar
│   ├── mysql-connector-java-<version>.jar
│   ├── README.md
│   └── # ...
└── debezium-connector-postgres
    ├── CHANGELOG.md
    ├── CONTRIBUTE.md
    ├── COPYRIGHT.txt
    ├── debezium-connector-postgres-<version>.jar
    ├── debezium-core-<version>.jar
    ├── LICENSE.txt
    ├── postgresql-<version>.jar
    ├── protobuf-java-<version>.jar
    ├── README.md
    └── # ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

COPY 命令指向要复制到容器映像的插件文件。

此示例添加了 Debezium 连接器(MongoDB、MySQL 和 PostgreSQL)的插件,但为了简洁起见并未列出所有文件。在 Kafka Connect 中运行的 Debezium 看起来与任何其他 Kafka Connect 任务相同。

2.构建容器镜像。

3.将您的自定义映像推送到容器注册表。

4.指向新的容器映像。

您可以通过以下方式之一指向图像:

编辑 KafkaConnect 自定义资源的 KafkaConnect.spec.image 属性。

如果设置,此属性将覆盖 Cluster Operator 中的 STRIMZI_KAFKA_CONNECT_IMAGES 环境变量。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec: (1)
  #...
  image: my-new-container-image (2)
  config: (3)
    #...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

1.Kafka Connect 集群的规范。

2.Pod 的 docker 镜像。

3.Kafka Connect 工作线程(不是连接器)的配置。

编辑 install/cluster-operator/060-Deployment-strimzi-cluster-operator.yaml 文件中的 STRIMZI_KAFKA_CONNECT_IMAGES 环境变量以指向新的容器映像,然后重新安装 Cluster Operator。

八、部署KafkaConnector资源

部署KafkaConnector资源来管理连接器。 KafkaConnector 自定义资源提供了一种 Kubernetes 原生方法来通过 Cluster Operator 管理连接器。您不需要像使用 Kafka Connect REST API 那样发送 HTTP 请求来管理连接器。您可以通过更新其相应的 KafkaConnector 资源,然后应用更新来管理正在运行的连接器实例。 Cluster Operator 更新正在运行的连接器实例的配置。您可以通过删除对应的 KafkaConnector 来删除连接器。

KafkaConnector 资源必须部署到与其链接的 Kafka Connect 集群相同的命名空间。

在此过程中显示的配置中,autoRestart 属性设置为 true。这可以自动重新启动失败的连接器和任务。最多进行七次重新启动尝试,之后必须手动重新启动。您可以注释 KafkaConnector 资源以重新启动连接器或手动重新启动连接器任务。

连接器示例

您可以使用自己的连接器或尝试 Strimzi 提供的示例。在 Apache Kafka 3.1.0 之前,示例文件连接器插件包含在 Apache Kafka 中。从 Apache Kafka 3.1.1 和 3.2.0 版本开始,需要像任何其他连接器一样将示例添加到插件路径中。

Strimzi 为示例文件连接器插件提供了一个示例 KafkaConnector 配置文件 (examples/connect/source-connector.yaml),该文件创建以下连接器实例作为 KafkaConnector 资源:

  • 一个 FileStreamSourceConnector 实例,从 Kafka 许可证文件(源)读取每一行,并将数据作为消息写入单个 Kafka 主题。
  • 一个 FileStreamSinkConnector 实例,用于从 Kafka 主题读取消息并将消息写入临时文件(接收器)。

我们在此过程中使用示例文件创建连接器。

先决条件

  • Kafka Connect 部署
  • Cluster Operator 正在运行

程序
1.通过以下方式之一将 FileStreamSourceConnector 和 FileStreamSinkConnector 插件添加到 Kafka Connect:

  • 配置 Kafka Connect 自动构建带有插件的新容器镜像
  • 从基础 Kafka Connect 映像创建 Docker 映像(手动或使用持续集成)

2.在 Kafka Connect 配置中将 strimzi.io/use-connector-resources 注释设置为 true。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
    # ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

启用 KafkaConnector 资源后,Cluster Operator 会监视它们。

3.编辑 example/connect/source-connector.yaml 文件:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector # (1)
  labels:
    strimzi.io/cluster: my-connect-cluster # (2)
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector # (3)
  tasksMax: 2 # (4)
  autoRestart: # (5)
    enabled: true
  config: # (6)
    file: "/opt/kafka/LICENSE" # (7)
    topic: my-topic (8)
    # ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  1. KafkaConnector资源的名称,用作连接器的名称。使用对 Kubernetes 资源有效的任何名称。
  2. 要在其中创建连接器实例的 Kafka Connect 集群的名称。连接器必须部署到与其链接到的 Kafka Connect 集群相同的命名空间。
  3. 连接器类的全名或别名。这应该出现在 Kafka Connect 集群使用的图像中。
  4. 连接器可以创建的 Kafka Connect 任务的最大数量。
  5. 启用失败的连接器和任务的自动重新启动。
  6. 连接器配置为键值对。
  7. 此示例源连接器配置从 /opt/kafka/LICENSE 文件读取数据。
  8. 将源数据发布到的 Kafka 主题。

4.在 Kubernetes 集群中创建源 KafkaConnector:

kubectl apply -f examples/connect/source-connector.yaml
  • 1

5.创建 example/connect/sink-connector.yaml 文件:

touch examples/connect/sink-connector.yaml
  • 1

6.将以下 YAML 粘贴到接收器连接器.yaml 文件中:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  labels:
    strimzi.io/cluster: my-connect
spec:
  class: org.apache.kafka.connect.file.FileStreamSinkConnector (1)
  tasksMax: 2
  config: (2)
    file: "/tmp/my-file" (3)
    topics: my-topic (4)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 连接器类的全名或别名。这应该出现在 Kafka Connect 集群使用的图像中。
  2. 连接器配置为键值对。
  3. 用于将源数据发布到的临时文件。
  4. 用于读取源数据的 Kafka 主题。

7.在 Kubernetes 集群中创建接收器 KafkaConnector:

kubectl apply -f examples/connect/sink-connector.yaml
  • 1

8.检查连接器资源是否已创建:

kubectl get kctr --selector strimzi.io/cluster=<my_connect_cluster> -o name

my-source-connector
my-sink-connector
  • 1
  • 2
  • 3
  • 4

将 <my_connect_cluster> 替换为 Kafka Connect 集群的名称。

9.在容器中,执行 kafka-console-consumer.sh 以读取源连接器写入主题的消息:

kubectl exec <my_kafka_cluster>-kafka-0 -i -t 
-- bin/kafka-console-consumer.sh 
--bootstrap-server <my_kafka_cluster>-kafka-bootstrap.NAMESPACE.svc:9092 
--topic my-topic --from-beginning
  • 1
  • 2
  • 3
  • 4

将 <my_kafka_cluster> 替换为您的 Kafka 集群的名称。

九、手动重新启动连接器

如果您使用 KafkaConnector 资源来管理连接器,请使用重新启动注释来手动触发连接器的重新启动。

先决条件

  • Cluster Operator 正在运行。

程序

1.找到控制要重新启动的 Kafka 连接器的 KafkaConnector 自定义资源的名称:

kubectl get KafkaConnector
  • 1

2.通过在 Kubernetes 中注释 KafkaConnector 资源来重新启动连接器。

kubectl annotate KafkaConnector <kafka_connector_name> strimzi.io/restart=true
  • 1

重新启动注释设置为 true。

3.等待下一次协调发生(默认情况下每两分钟一次)。

只要协调进程检测到注释,Kafka 连接器就会重新启动。当 Kafka Connect 接受重启请求时,注释将从 KafkaConnector 自定义资源中删除。

十、手动重启Kafka连接器任务

如果您使用 KafkaConnector 资源来管理连接器,请使用 restart-task 注解手动触发连接器任务的重新启动。

先决条件

  • Cluster Operator 正在运行。

程序

1.找到控制要重新启动的 Kafka 连接器任务的 KafkaConnector 自定义资源的名称:

kubectl get KafkaConnector
  • 1

2.从KafkaConnector自定义资源中找到需要重启的任务ID。任务ID是非负整数,从0开始:

kubectl describe KafkaConnector <kafka_connector_name>
  • 1

3.通过在 Kubernetes 中注释 KafkaConnector 资源,使用 ID 重新启动连接器任务:

kubectl annotate KafkaConnector <kafka_connector_name> strimzi.io/restart-task=0
  • 1

在此示例中,任务 0 被重新启动。

4.等待下一次协调发生(默认情况下每两分钟一次)。

只要协调进程检测到注释,Kafka 连接器任务就会重新启动。当 Kafka Connect 接受重启请求时,注释将从 KafkaConnector 自定义资源中删除。

十一、公开 Kafka Connect API

使用 Kafka Connect REST API 作为使用 KafkaConnector 资源的替代方法来管理连接器。 Kafka Connect REST API 作为在 <connect_cluster_name>-connect-api:8083 上运行的服务提供,其中 <connect_cluster_name> 是 Kafka Connect 集群的名称。该服务是在您创建 Kafka Connect 实例时创建的。

添加连接器配置的示例curl请求

curl -X POST \
  http://my-connect-cluster-connect-api:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "my-source-connector",
    "config":
    {
      "connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "file": "/opt/kafka/LICENSE",
      "topic":"my-topic",
      "tasksMax": "4",
      "type": "source"
    }
}'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

该 API 只能在 Kubernetes 集群内访问。如果您想让 Kafka Connect API 可供在 Kubernetes 集群外部运行的应用程序访问,您可以通过创建以下功能之一来手动公开它:

  • LoadBalancer 或 NodePort 类型服务
  • 入口资源(仅限 Kubernetes)
  • OpenShift 路线(仅限 OpenShift)

如果您决定创建服务,请使用 <connect_cluster_name>-connect-api 服务的选择器中的标签来配置服务将流量路由到的 pod:

服务的选择器配置

# ...
selector:
  strimzi.io/cluster: my-connect-cluster (1)
  strimzi.io/kind: KafkaConnect
  strimzi.io/name: my-connect-cluster-connect (2)
#...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. Kubernetes 集群中的 Kafka Connect 自定义资源的名称。
  2. Cluster Operator 创建的 Kafka Connect 部署的名称。

您还必须创建一个允许来自外部客户端的 HTTP 请求的 NetworkPolicy。

允许向 Kafka Connect API 发出请求的 NetworkPolicy 示例

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: my-custom-connect-network-policy
spec:
  ingress:
  - from:
    - podSelector: (1)
        matchLabels:
          app: my-connector-manager
    ports:
    - port: 8083
      protocol: TCP
  podSelector:
    matchLabels:
      strimzi.io/cluster: my-connect-cluster
      strimzi.io/kind: KafkaConnect
      strimzi.io/name: my-connect-cluster-connect
  policyTypes:
  - Ingress

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

允许连接API的Pod的标签。

要在集群外部添加连接器配置,请使用在curl 命令中公开API 的资源的URL。

十二、禁止覆盖配置

连接器.client.config.override.policy
将 Connector.client.config.override.policy 属性设置为 None 以防止连接器配置覆盖 Kafka Connect 配置及其使用的消费者和生产者。

指定连接器覆盖策略的示例配置

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # ...
  config:
    connector.client.config.override.policy: None
# ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/782170
推荐阅读
相关标签
  

闽ICP备14008679号