赞
踩
执行命令创建项目
$ oc new-project debezium-cdc
$ oc new-app docker.io/debezium/example-mysql:latest -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -n debezium-cdc
$ MYSQL_POD=$(oc get pod -l deployment=example-mysql -o jsonpath={.items[0].metadata.name} -n debezium-cdc)
$ oc exec $MYSQL_POD -it -- mysql -u mysqluser -pmysqlpw inventory
mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
在 OpenShift 的控制台 Administrator 视图的 OperatorHub 中找到名为 Strimzi 或 AMQ Streams 的 Operator(Strimzi 是社区版 Kafka,AMQ Streams 是 RedHat 版的 Kafka),然后接受默认配置安装。
安装完 Operator 后,进入 “开发者” 视图的 “+添加” 菜单。在右侧页面中点击 “添加” 下方的 “添加至项目” 图标,然后在搜索条中查找到 “kafka”,最后点击 “创建” 按钮。
在 “创建 Kafka“ 页面中接受默认配置,然后点击 “创建” 按钮。在 Kafka 示例完成部署后拓扑界面如下:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster namespace: debezium-cdc annotations: strimzi.io/use-connector-resources: 'true' spec: config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 build: output: type: docker image: image-registry.openshift-image-registry.svc:5000/debezium-cdc/debezium-connect-mysql:latest plugins: - name: debezium-connector-mysql artifacts: - type: zip url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/1.9.5.Final-redhat-00001/debezium-connector-mysql-1.9.5.Final-redhat-00001-plugin.zip tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt version: 3.1.0 replicas: 1 bootstrapServers: 'my-cluster-kafka-bootstrap:9093'
$ oc get build -n debezium-cdc
NAME TYPE FROM STATUS STARTED DURATION
my-connect-cluster-connect-build-1 Docker Dockerfile Complete 5 hours ago 51s
$ oc get is debezium-streams-connect -n debezium-cdc
NAME IMAGE REPOSITORY TAGS UPDATED
debezium-streams-connect image-registry.openshift-image-registry.svc:5000/debezium-cdc/debezium-streams-connect latest 5 hours ago
$ oc expose svc/my-connect-cluster-connect-api -n debezium-cdc
$ CONNECT_API=$(oc get route my-connect-cluster-connect-api -o jsonpath='{ .spec.host }' -n debezium-cdc)
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-connector-mysql namespace: debezium-cdc labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: tasks.max: 1 database.hostname: example-mysql database.port: 3306 database.user: debezium database.password: dbz database.server.id: 184054 database.server.name: mysql database.include.list: inventory database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9093 database.history.kafka.topic: schema-changes.inventory
$ oc get KafkaConnector -n debezium-cdc NAME CLUSTER CONNECTOR CLASS MAX TASKS READY my-connector-mysql my-connect-cluster io.debezium.connector.mysql.MySqlConnector 1 True $ oc describe KafkaConnector my-connector-mysql -n debezium-cdc 。。。 Status: Conditions: Last Transition Time: 2022-08-13T10:28:02.625375Z Status: True Type: Ready Connector Status: Connector: State: RUNNING worker_id: 10.128.0.67:8083 Name: my-connector-mysql Tasks: Type: source Observed Generation: 1 Tasks Max: 1 Topics:
$ oc new-app quay.io/efeluzy/quarkus-kafka-consumer:latest -e mp.messaging.incoming.mytopic-subscriber.topic=mysql.inventory.customers -n debezium-cdc
$ oc expose svc quarkus-kafka-consumer -n debezium-cdc
$ oc get kafkatopic mysql.inventory.customers
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
mysql.inventory.customers my-cluster 1 3 True
$ curl http://$(oc get route quarkus-kafka-consumer -o jsonpath='{ .spec.host }' -n debezium-cdc)/stream
mysql> update customers set first_name='Test' where id = 1003;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
data: Kafka Offset=4; message={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"after":{"id":1003,"first_name":"erfin","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"1.1.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1595073286000,"snapshot":"false","db":"inventory","table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":364,"row":0,"thread":9,"query":null},"op":"u","ts_ms":1595073286806,"transaction":null}}
https://debezium.io/documentation/reference/stable/operations/openshift.html
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-connector-properties
https://access.redhat.com/documentation/en-us/red_hat_integration/2022.q3/html/getting_started_with_debezium/starting-services
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。