赞
踩
# 创建插件文件夹及Dockerfile mkdir plugins # 此处创建kafka-connector-hana文件夹放kafka-connector-hana的插件以及hana的jdbc jar包 mkdir plugins/kafka-connector-hana # 其他插件同理,将插件所需文件放到plugins的插件文件夹即可 # 将准备好的Jar包放到指定文件夹 mv ./ngdbc.jar ./kafka-connector-hana/plugins/kafka-connector-hana/ mv ./kafka-connector-hana_2.13-0.9.4.jar ./kafka-connector-hana/plugins/kafka-connector-hana/ vim Dockerfile -------------- FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0 USER root:root COPY ./plugins/ /opt/kafka/plugins/ USER 1001 -------------- # 打包镜像 docker build -t xx.xx.xx.xx/kafka-connector-hana:v1 . # 上传镜像到自己的镜像仓库 docker push xx.xx.xx.xx/kafka-connector-hana:v1
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster # annotations: # # use-connector-resources configures this KafkaConnect # # to use KafkaConnector resources to avoid # # needing to call the Connect REST API directly # strimzi.io/use-connector-resources: "true" spec: version: 3.4.0 replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9093 image: xx.xx.xx.xx/kafka-connector-hana:v1 tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status # -1 means it will use the default replication factor configured in the broker config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1
apiVersion: v1 kind: Service metadata: name: kafka-connect-svc namespace: kafka spec: type: NodePort ports: - protocol: TCP port: 8083 nodePort: 30003 targetPort: 8083 name: "p8083" selector: strimzi.io/cluster: my-connect-cluster strimzi.io/kind: KafkaConnect strimzi.io/name: my-connect-cluster-connect
kubectl apply -f kafka-connect.yaml -n kafka
kubectl apply -f kafka-connect-svc.yaml -n kafka
# Pod启动后正常应该可以通过 http://IP:30003/connector-plugins 查看到connector插件
curl http://IP:30003/connector-plugins
# 以hana配置为例 vim hana-source.json ----------------- { "name": "hana-source", "config": { "connector.class" : "com.sap.kafka.connect.source.hana.HANASourceConnector", "tasks.max" : "1", "topics" : "finance_trigger_change_log_topic", "connection.url" : "jdbc:sap://ip:30015/", "connection.user" : "user", "connection.password": "pass", "finance_trigger_change_log_topic.table.name": "\"SAPHANADB\".\"TEST\"", "auto.create":"true", "mode" : "incrementing", "finance_trigger_change_log_topic.poll.interval.ms": "5000", "finance_trigger_change_log_topic.incrementing.column.name": "COLUMN_NAME" } } ----------------- # 通过API创建connector cat hana300-source.json | curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://ip:30003/connectors -d @- # 查看connectors列表 curl http:/IP:30003/connectors # 查看hana-source状态 curl http:/IP:30003/connectors/hana-source/status # 删除hana-source curl -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" http://IP:30003/connectors/hana-source
GET /connectors – 返回所有正在运行的connector名
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息
GET /connectors/{name}/config – 获取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。