赞
踩
在项目中,我们遇到已有数据库现存有大量数据,但需要将全部现存数据同步迁移到新的数据库中,我们应该如何处理呢?
本期我们就基于Debezium与Kafka构建数据同步。
Debezium 是一个基于不同数据库中提供的变更数据捕获功能(例如,PostgreSQL中的逻辑解码)构建的分布式平台。 Debezium是通过Apache Kafka连接部署的。
Kafka Connect是一个用于实现和操作的框架运行时。
源连接器,如Debezium,它将数据摄取到Kafka中(在我们的接下来实际的例子中,Debezium将Mysql数据摄取到Kafka中);
接收连接器,它将数据从Kafka主题写入到其他到系统,这个系统可以有多种,在我们例子中,会将Kafka主题写入到PostgreSQL数据库中。
本次部署需要先安装Docker。
Debezium使用Docker安装部署,如下⬇
docker-compose.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.0
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.0
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
connect:
image: quay.io/debezium/connect:2.0
ports:
- 8083:8083
- 5005:5005
links:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "9093:8080"
environment:
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
links:
- kafka
debezium-ui:
image: debezium/debezium-ui:2.0
ports:
- "8080:8080"
environment:
- KAFKA_CONNECT_URIS=http://connect:8083
links:
- connect
部署命令:
docker-compose -f docker-compose.yaml -p debezium up -d
部署完成后,Docker容器列表,如下:
Kafka-ui访问地址:http://localhost:9093
Debezium-ui访问地址:http://localhost:8080
Source Connector和Sink Connector都是以JAR包的方式,存在于Connect容器的/kafka/connect目录下。
Connect容器自带有Debezium的官方Source Connector:
需要自行注册Sink Connector:Kafka-Connect-JDBC(新建Kafka-Connect-JDBC目录,下载JAR包放入此目录,重启Conenct)。
注册Sink Connector
# docker容器中新建kafka-connect-jdbc目录
docker exec 容器id mkdir /kafka/connect/kafka-connect-jdbc
# 下载jar包到本地
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/5.3.2/kafka-connect-jdbc-5.3.2.jar
# 拷贝jar包到docker容器
docker cp kafka-connect-jdbc-5.3.2.jar 容器id:/kafka/connect/kafka-connect-jdbc
# 重启connect容器
docker restart 容器id
数据迁移经历以下几个步骤:
1)启动源数据库;
2)注册Source Connector,Source Connector监听Source Database的数据变动,发布数据到Kafka的Topic中,一个表对应一个Topic,Topic中包含对表中某条记录的某个操作(新增,修改,删除等);
3)启动目标数据库;
4)注册Sink Connector,Sink Connector消费Kafka中的Topic,通过JDBC连接到Target Database,根据Topic中的信息,对表记录执行对应操作。
本次部署通过容器的方式启动:
docker run -d --name source-postgres -p 15432:5432 -e POSTGRES_PASSWORD=123456 -e POSTGRES_USER=debe postgres:12.6
通过Debezium UI页面进行注册。
需要注意的有以下几点:
Debezium Postgres类型的Source Connector支持的Postgres需要将wal_level修改为logical;修改Postgres中的Postgresql.conf文件中的配置(wal_level = logical)并重启Postgres;
Postgres需要支持解码插件,Debezium官方一共提供了两个解码插件:
Decoderbufs:Debezium默认配置,由Debezium维护;
Pgoutput:Postgres 10+版本自带;使用此插件时,需要配置plugin.name=pgoutput
docker run -d --name target-postgres -p 25432:5432 -e POSTGRES_PASSWORD=123456 -e POSTGRES_USER=debe postgres:12.6
通过Connect提供的API进行注册
新增Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d \
'{
"name": "sink-connector-postgres",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "postgres.public.test_source",
"connection.url": "jdbc:postgresql://10.3.73.160:25432/postgres?user=debe&password=123456",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}'
源数据库中的表数据迁移到Kafka
新建表test_source和test_source1
test_source&test_source1.sql
-- test_source
create table if not exists public.test_source
(
id integer not null
constraint test_source_pk
primary key,
name varchar(64)
);
alter table public.test_source
owner to debe;
insert into public.test_source (id, name) values (1, 'a');
-- test_source1
create table if not exists public.test_source1
(
id integer not null
constraint test_source1_pk
primary key,
name varchar(64)
);
alter table public.test_source1
owner to debe;
insert into public.test_source1 (id, name) values (1, 'a1');
Kafka新建数据前 ⬇
Kafka新建数据后 ⬇
源数据库中新建表test_source和表test_source1后,Kafka中出现了两个Topic:
postgres.public.test_source和postgres.public.test_source1,与这两个表一一对应,topic中的message对应着对表中记录的操作(新增1条记录)。
监听的表可通过连接器配置进行过滤,比如配置"table.include.list": “public.test_source”,就只会出现一个Topic:postgres.public.test_source
Kafka中的数据迁移到目标数据库
注册Sink Connector后,Kafka中会新增一个Customer,对postgres.public.test_source进行消费(sink connector配置中的"topics": "postgres.public.test_source"指定);
对应的源数据库(sink connector配置中的"connection.url": "jdbc:postgresql://10.3.73.160:25432/postgres?user=debe&password=123456"指定)会新增一个表public.test_source,该表中的数据和源数据库中的public.test_source始终保持同步。
本次部署通过docker启动:
docker run -d --name source-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.0
启动MySQL数据源连接注册
注册MySQL数据源有两种方式:
1、在Debezium UI中直接添加
2、调用Kafka API 注册
在Debezium UI中直接添加
选择MySQL数据源
调用Kafka API注册
新增Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d \
'{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"topic.prefix": "dbserver1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium", //数据库用户名
"database.password": "dbz", //数据库密码
"database.server.id": "184054",
"database.include.list": "inventory", //数据源覆盖范围
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}'
验证Source Connector注册结果
注册连接前:
注册连接后:
多出来的Topics信息是MySQL source表信息,连接MySQL数据库可见表:
UI for Apache Kafka中可以看到Messages同步信息。
访问Debezium UI(http://localhost:8080/ )可以看到MySQL的连接。
本次部署采用Docker方式启动:
docker run -d --name target-postgres -p 5432:5432 -e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=postgrespw -e POSTGRES_DB=inventory debezium/postgres:9.6
新增Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d \
'{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers", //迁移目标主题(这里是按照表来订阅的)
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}'
注册PostgreSQL connector后,不会在Debezium中显示Connector client 信息,但可以在UI for Apache Kafka中看到:
完成安装步骤后,以Customers表为例,做CUD操作语句,实现MySQL数据库同步数据到PostgreSQL 。
Mysql 数据库现有数据:
手动在MySQL数据库Customers表中添加一条数据 ⬇
customers.sql
insert into customers(id,first_name,last_name,email) values(1005,'test','one','123456@qq.com');
在PostgreSQL数据库中Customers多出一条数据:
Kafka中Messages新增一条数据,完成数据同步:
可以看到消费如下信息:
topics-customers.json
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 1005,
"first_name": "test",
"last_name": "one",
"email": "123456@qq.com"
},
"source": {
"version": "2.0.1.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1672024796000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 392,
"row": 0,
"thread": 16,
"query": null
},
"op": "c",
"ts_ms": 1672024796396,
"transaction": null
}
}
重要的部分是 “payload” json 中信息:
需要注意的是,结果的json格式是Debezium定义好的格式。
Debezium json格式通常前面定义Schema信息,最后才是实际的载荷(payload)信息。
详细格式定义可以查看:https://debezium.io/documentation/reference/1.6/connectors/mysql.html
通过以上步骤,我们在Docker环境上使用Debezium实现了数据同步到kafaka。本期关于数据同步迁移的内容就到这里了,建议大家收藏学习!~
版权申明:文章由神州数码武汉云基地团队实践整理输出,转载请注明出处。
微信公众号后台回复“技术合集”,可获取更多干货内容!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。