赞
踩
本文会将从环境搭建到demo来全流程体验flinkcdc 3.0
包含了如下内容
- flink1.18 standalone搭建
- doris 1fe1be 搭建
- 整库数据同步
- 测试各同步场景
- 从检查点重启同步任务
下载flink 1.18.0 链接 : https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
解压 :
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
修改checkpoint 时间间隔 为3秒
vim conf/flink-conf.yaml
# 94 行(set nu 显示行)
taskmanager.numberOfTaskSlots: 2
# 148 行
execution.checkpointing.interval: 3000
启动
./bin/start-cluster.sh
访问页面 : http://127.0.0.1:8081
修改环境宿主机的内存映射
# 因为mac内部实现容器的方式不同,直接修改max_map_count值可能无法成功,所以在容器中进行修改
docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh
# 修改内存映射值(这个值通常用于限制一个进程打开的文件数量,默认是65530)
sysctl -w vm.max_map_count=2000000
# 退出容器
exit
使用docker compose 搭建doris 1fe1be集群
version: '3' services: docker-fe-01: image: "apache/doris:1.2.2-fe-arm" container_name: "doris-fe-01" hostname: "fe-01" environment: - FE_SERVERS=fe1:172.20.80.2:9010 - FE_ID=1 ports: - 8031:8030 - 9031:9030 volumes: - /Users/antg/docker/doris_1fe_1be/data/fe-01/doris-meta:/opt/apache-doris/fe/doris-meta - /Users/antg/docker/doris_1fe_1be/data/fe-01/conf:/opt/apache-doris/fe/conf - /Users/antg/docker/doris_1fe_1be/data/fe-01/log:/opt/apache-doris/fe/log networks: doris_net: ipv4_address: 172.20.80.2 docker-be-01: image: "apache/doris:1.2.2-be-arm" container_name: "doris-be-01" hostname: "be-01" depends_on: - docker-fe-01 environment: - FE_SERVERS=fe1:172.20.80.2:9010 - BE_ADDR=172.20.80.5:9050 ports: - 8041:8040 volumes: - /Users/antg/docker/doris_1fe_1be/data/be-01/storage:/opt/apache-doris/be/storage - /Users/antg/docker/doris_1fe_1be/data/be-01/conf:/opt/apache-doris/be/conf - /Users/antg/docker/doris_1fe_1be/data/be-01/script:/docker-entrypoint-initdb.d - /Users/antg/docker/doris_1fe_1be/data/be-01/log:/opt/apache-doris/be/log networks: doris_net: ipv4_address: 172.20.80.5 networks: doris_net: ipam: config: - subnet: 172.20.80.0/24
启动并验证是否启动成功
# 启动
docker-compose -f 1fe_1be.yaml up -d
# 连接doris
mysql -h127.0.0.1 -P9031 -uroot -p
# 创建数据库 doris_sync
> create database doris_sync;
使用本机之前安装的mysql
建测试库测试表
create database doris_sync; CREATE TABLE `a_0` ( `id` int NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; CREATE TABLE `a_1` ( `id` int NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; CREATE TABLE `abc` ( `id` int NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; CREATE TABLE `table_0` ( `id` int NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; CREATE TABLE `table_1` ( `id` int NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=101 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
其中 a_0,a_1 是分表,table_0,table_1是另外一个分表,abc是一个单独的表
初始化插入一些测试数据
INSERT INTO `a_0` (`id`, `name`) VALUES (1, 'a'); INSERT INTO `a_1` (`id`, `name`) VALUES (2, 'b'); BEGIN; INSERT INTO `abc` (`id`, `name`) VALUES (1, 'Luo Rui'); INSERT INTO `abc` (`id`, `name`) VALUES (2, 'Yung Wing Kuen'); INSERT INTO `abc` (`id`, `name`) VALUES (3, 'Chiang Chun Yu'); INSERT INTO `abc` (`id`, `name`) VALUES (4, 'Tang Ming'); INSERT INTO `abc` (`id`, `name`) VALUES (5, 'Man Wai Lam'); INSERT INTO `abc` (`id`, `name`) VALUES (6, 'Tin Tsz Ching'); INSERT INTO `abc` (`id`, `name`) VALUES (7, 'Doris Moore'); INSERT INTO `abc` (`id`, `name`) VALUES (8, 'Abe Mitsuki'); INSERT INTO `abc` (`id`, `name`) VALUES (9, 'Du Shihan'); INSERT INTO `abc` (`id`, `name`) VALUES (10, 'Chiang Chi Yuen'); COMMIT; BEGIN; INSERT INTO `table_0` (`id`, `name`) VALUES (1, 'Luo Rui'); INSERT INTO `table_0` (`id`, `name`) VALUES (2, 'Yung Wing Kuen'); INSERT INTO `table_0` (`id`, `name`) VALUES (3, 'Chiang Chun Yu'); INSERT INTO `table_0` (`id`, `name`) VALUES (4, 'Tang Ming'); INSERT INTO `table_0` (`id`, `name`) VALUES (5, 'Man Wai Lam'); INSERT INTO `table_0` (`id`, `name`) VALUES (6, 'Tin Tsz Ching'); INSERT INTO `table_0` (`id`, `name`) VALUES (7, 'Doris Moore'); INSERT INTO `table_0` (`id`, `name`) VALUES (8, 'Abe Mitsuki'); INSERT INTO `table_0` (`id`, `name`) VALUES (9, 'Du Shihan'); INSERT INTO `table_0` (`id`, `name`) VALUES (10, 'Chiang Chi Yuen'); COMMIT; INSERT INTO `table_1` (`id`, `name`) VALUES (100, 'tom');
我们在代码中开发过程中可能会用到容器的ip地址,例如上面的172.20.80.0/24这个网段,但是你会发现你是ping不通的,这里设计到了一些docker网络的一些知识,可以在网上看一下资料,这里只给出解决方法
安装路由转发镜像
# 现在连接器
brew install wenjunxiao/brew/docker-connector
# 加入路由
docker network ls --filter driver=bridge --format "{{.ID}}" | xargs docker network inspect --format "route {{range .IPAM.Config}}{{.Subnet}}{{end}}" >> /opt/homebrew/etc/docker-connector.conf
# 启动路由器
sudo /opt/homebrew/opt/docker-connector/bin/docker-connector -config /opt/homebrew/etc/docker-connector.conf
# 启动镜像
docker run -it -d --restart always --net host --cap-add NET_ADMIN --name connector wenjunxiao/mac-docker-connector
如果还是ping不通就重启一下上面的转发容器
这一步很重要,想要通过访问容器的ip就要完成这一步
下载flinkcdc 的依赖包放到flink目录下并解压
flinkcdc 依赖 : flink-cdc-3.0.0-bin.tar.gz
下载连接器 的依赖包放到flinkcdc的lib目录下
connector 依赖 :
配置FLINK_HOME环境变量
pwd
/Users/antg/software/flink-1.18.0/
export FLINK_HOME=/Users/antg/software/flink-1.18.0/
编写yaml文件 mysql-to-doris.yaml
################################################################################ # Description: Sync MySQL all tables to Doris ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 12345678 tables: doris_sync.\.* server-id: 5400-5404 server-time-zone: Asia/Shanghai sink: type: doris fenodes: 127.0.0.1:8031 username: root password: "" table.create.properties.light_schema_change: true table.create.properties.replication_num: 1 pipeline: name: Sync MySQL Database to Doris parallelism: 2
启动任务
bash bin/flink-cdc.sh mysql-to-doris.yaml
查看页面效果
这里可以看到同步的数据条数及大小
查看doris的数据及建表情况
可以看到表被自动创建并且数据也同步过来了
新增数据
INSERT INTO `a_0` (`id`, `name`) VALUES (3, 'jack');
更新数据
update a_0 set name='tom' where id=3;
删除数据
delete from a_0 where id=1;
没成功同步(已咨询社区是1.2.2的bug,在1.2.3修复了,正常来说会同步)
新增字段
alter table a_0 add column age int;
修改字段
# 修改名称
alter table a_0 change age age_range int;
# 修改字段类型
alter table a_0 modify column age_range varchar(100);
# 字段字段长度
alter table a_0 modify column age_range varchar(1200);
以上语句不会被同步
删除字段
alter table a_0 drop column age_range;
以上语句不会被同步
删除表
drop table a_0;
不会被同步
结论 :
1.新增数据,新增字段,修改数据会被实时同步到doris
2.delete数据不会被同步(已咨询社区是1.2.2的bug,在1.2.3修复了,正常来说会同步)
3.修改字段名称,类型,长度不会被同步(可能有参数可以开启)
4.删除字段不会被同步
5.删除表不会被同步
这里将使用flinkcdc3.0 新增的路由功能来实现分表合一的效果,而且也可以做到同步到doris的库名和表名换成自己想要的名称
将之前的mysql端数据清理,表重新建立
需求 :
将mysql端doris_sync同步到doris的ods库中
a_0,a_1 合并到ods_a表
abc 同步到 ods_abc表
table_0,table_1同步到 ods_table表
任务配置 route.yaml
################################################################################ # Description: Sync MySQL all tables to Doris ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 12345678 tables: doris_sync.\.* server-id: 5400-5404 server-time-zone: Asia/Shanghai sink: type: doris fenodes: 127.0.0.1:8031 username: root password: "" table.create.properties.light_schema_change: true table.create.properties.replication_num: 1 route: - source-table: doris_sync.a_\.* sink-table: ods.ods_a - source-table: doris_sync.abc sink-table: ods.ods_abc - source-table: doris_sync.table_\.* sink-table: ods.ods_table pipeline: name: Sync MySQL Database to Doris parallelism: 2
创建doris端ods库(不会自动创建库,必须手动创建)
create database ods;
将之前的任务停掉,启动这个任务
可以看到
1.多个分表在doris只创建了一个目标表
2.多个分表的数据都同步到了一个表中
非常棒的功能
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。