当前位置:   article > 正文

kafka k8s部署kafka connect(以kafka-connector-hana插件为例)_kafka connect部署

kafka connect部署

kafka-connect-sap地址

  1. 创建kafka connect镜像
# 创建插件文件夹及Dockerfile
mkdir plugins
# 此处创建kafka-connector-hana文件夹放kafka-connector-hana的插件以及hana的jdbc jar包
mkdir plugins/kafka-connector-hana

# 其他插件同理,将插件所需文件放到plugins的插件文件夹即可
# 将准备好的Jar包放到指定文件夹
mv ./ngdbc.jar ./kafka-connector-hana/plugins/kafka-connector-hana/
mv ./kafka-connector-hana_2.13-0.9.4.jar ./kafka-connector-hana/plugins/kafka-connector-hana/


vim Dockerfile
--------------
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY ./plugins/ /opt/kafka/plugins/
USER 1001
--------------

# 打包镜像
docker build -t xx.xx.xx.xx/kafka-connector-hana:v1 .

# 上传镜像到自己的镜像仓库
docker push xx.xx.xx.xx/kafka-connector-hana:v1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  1. 创建kafka connector 部署文件 kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
#  annotations:
#  # use-connector-resources configures this KafkaConnect
#  # to use KafkaConnector resources to avoid
#  # needing to call the Connect REST API directly
#    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.4.0
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  image: xx.xx.xx.xx/kafka-connector-hana:v1
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  1. 创建kafka connector对应的service文件,使用NodePort方便后续使用 kafka-connect-svc.yaml
apiVersion: v1
kind: Service
metadata:
  name: kafka-connect-svc
  namespace: kafka
spec:
  type: NodePort
  ports:
    - protocol: TCP
      port: 8083
      nodePort: 30003
      targetPort: 8083
      name: "p8083"
  selector:
    strimzi.io/cluster: my-connect-cluster
    strimzi.io/kind: KafkaConnect
    strimzi.io/name: my-connect-cluster-connect
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  1. 部署kafka-connect
kubectl apply -f kafka-connect.yaml -n kafka
kubectl apply -f kafka-connect-svc.yaml -n kafka

# Pod启动后正常应该可以通过 http://IP:30003/connector-plugins 查看到connector插件
curl http://IP:30003/connector-plugins
  • 1
  • 2
  • 3
  • 4
  • 5
  1. 配置插件json数据源
# 以hana配置为例
vim hana-source.json
-----------------
{
  "name": "hana-source",
  "config": {
    "connector.class" : "com.sap.kafka.connect.source.hana.HANASourceConnector",
    "tasks.max" : "1",
    "topics" : "finance_trigger_change_log_topic",
    "connection.url" : "jdbc:sap://ip:30015/",
    "connection.user" : "user",
    "connection.password": "pass",
    "finance_trigger_change_log_topic.table.name": "\"SAPHANADB\".\"TEST\"",
    "auto.create":"true",
    "mode" : "incrementing",
    "finance_trigger_change_log_topic.poll.interval.ms": "5000",
    "finance_trigger_change_log_topic.incrementing.column.name": "COLUMN_NAME"
  }
}
-----------------

# 通过API创建connector
cat hana300-source.json | curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://ip:30003/connectors -d @-

# 查看connectors列表
curl http:/IP:30003/connectors
# 查看hana-source状态
curl http:/IP:30003/connectors/hana-source/status

# 删除hana-source
curl -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" http://IP:30003/connectors/hana-source
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  1. 附connectors常用API
GET /connectors – 返回所有正在运行的connector名
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息
GET /connectors/{name}/config – 获取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/638866
推荐阅读
相关标签
  

闽ICP备14008679号