当前位置:   article > 正文

Flink实时同步MySQL与Doris数据_flink cdc mysql doris

flink cdc mysql doris

参考:

技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入-阿里云开发者社区

逻辑图:

1. Flink环境:

https://flink.apache.org/zh/

  • 下载flink-1.15.1
wget https://dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz

  • 解压,修改配置
tar -zxvf flink-1.15.1-bin-scala_2.12.tgz cd flink-1.15.1

  • 修改配置
修改rest.bind-address为 0.0.0.0
vi conf/flink-conf.yaml

  • 下载依赖jar包 至 flink安装目录lib下
  1. wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
  2. wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.14_2.12/1.0.3/flink-doris-connector-1.14_2.12-1.0.3.jar

  • 启动flink
./bin/start-cluster.sh

  • 访问WebUI

http://192.168.0.158:8081

2、MySQL数据表及数据

  1. 开启Binlog,进入容器修改/etc/mysql/mysql.cnf,然后重启mysql
  1. [mysqld]
  2. log_bin=mysql_bin
  3. binlog-format=Row
  4. server-id=1

  1. 进入MySQL命令行:创建数据库emp,数据表employee:
  1. CREATE DATABASE emp;
  2. USE emp;
  3. CREATE TABLE employee (
  4. emp_no INT NOT NULL,
  5. birth_date DATE NOT NULL,
  6. first_name VARCHAR(14) NOT NULL,
  7. last_name VARCHAR(16) NOT NULL,
  8. gender ENUM ('M','F') NOT NULL,
  9. hire_date DATE NOT NULL, PRIMARY KEY (emp_no)
  10. ); ​
  11. INSERT INTO `employee` VALUES
  12. (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
  13. (10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
  14. (10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
  15. (10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
  16. (10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),
  17. (10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),
  18. (10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),
  19. (10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),
  20. (10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),
  21. (10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'),
  22. (10011,'1953-11-07','Mary','Sluis','F','1990-01-22'),
  23. (10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'),
  24. (10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'),
  25. (10014,'1956-02-12','Berni','Genin','M','1987-03-11'),
  26. (10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'),
  27. (10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'),
  28. (10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'),
  29. (10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'),
  30. (10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'),
  31. (10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26');

3. Doris数据表

  1. 进入MySQL命令行:创建Doris数据库demo,数据表employee_info
  1. CREATE DATABASE demo;
  2. USE demo;
  3. CREATE TABLE employee_info (
  4. emp_no int NOT NULL,
  5. birth_date date,
  6. first_name varchar(20),
  7. last_name varchar(20),
  8. gender char(2),
  9. hire_date date,
  10. database_name varchar(50),
  11. table_name varchar(200)
  12. )
  13. UNIQUE KEY(`emp_no`, `birth_date`)
  14. DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
  15. PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );

4. Flink数据表及数据

  • 启动fink-sql-client
./bin/sql-client.sh embedded

  • 开启Checkpoint
Flink作业周期性执行checkpoint,记录Binlog位点,当作业发生Failover时,便会从之前记录的Binlog位点继续处理。
生产环境建议设置为60秒。
Flink SQL> SET execution.checkpointing.interval = 10s

  • 创建MySQL CDC表
  1. Flink SQL> CREATE TABLE employee_source (
  2. database_name STRING METADATA VIRTUAL,
  3. table_name STRING METADATA VIRTUAL,
  4. emp_no int NOT NULL,
  5. birth_date date,
  6. first_name STRING,
  7. last_name STRING,
  8. gender STRING,
  9. hire_date date,
  10. PRIMARY KEY (`emp_no`) NOT ENFORCED
  11. )
  12. WITH (
  13. 'connector' = 'mysql-cdc',
  14. 'hostname' = 'localhost',
  15. 'port' = '3336',
  16. 'username' = 'root',
  17. 'password' = '1234.abcd',
  18. 'database-name' = 'emp',
  19. 'table-name' = 'employee'
  20. );

查询数据:

Flink SQL> select * from employee_source limit 10;

  • 创建Doris Sink表
  1. Flink SQL> CREATE TABLE cdc_doris_sink (
  2. emp_no int ,
  3. birth_date STRING,
  4. first_name STRING,
  5. last_name STRING,
  6. gender STRING,
  7. hire_date STRING,
  8. database_name STRING,
  9. table_name STRING
  10. )
  11. WITH (
  12. 'connector' = 'doris',
  13. 'fenodes' = 'localhost:8030',
  14. 'table.identifier' = 'demo.employee_info',
  15. 'username' = 'root',
  16. 'password' = '1234.abcd'
  17. );
参数说明:
connector : 指定连接器是doris
fenodes:doris FE节点IP地址及http port
table.identifier : Doris对应的数据库及表名
username:doris用户名
password:doris用户密码

查询数据:

Flink SQL> select * from cdc_doris_sink;

  • 添加数据同步任务
  1. Flink SQL> insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date,database_name,table_name)
  2. select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date ,database_name,table_name from employee_source;

WebUI可以看到正在执行中的任务,说明添加完成

查看Doris数据表中数据

mysql> select * from employee_info;

5. 问题说明:

NoResourceAvailableException: Could not acquire the minimum required resources

进入flink目录,修改conf/conf/flink-conf.yaml:taskmanager.numberOfTaskSlots: 4 , 一般配置为cpu的个数。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/398639
推荐阅读
相关标签
  

闽ICP备14008679号