当前位置:   article > 正文

Flink CDC MySQL同步MySQL错误记录

Flink CDC MySQL同步MySQL错误记录

 

0、相关Jar包

Central Repository: org/apache/flink/flink-connector-jdbc/3.1.1-1.16

Central Repository: com/ververica/flink-sql-connector-mysql-cdc/3.0.0

或者从mvnrepository.com下载
https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc

https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc

1、启动 Flink SQL
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

在这里插入图片描述

2、新建源表

问题1:Encountered “(”
处理方法:去掉int(11),改为int

  1. Flink SQL> CREATE TABLE `t_user` (
  2. > `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
  3. > `did` int(11) DEFAULT NULL COMMENT 'dept id',
  4. > `username` varchar(14) DEFAULT NULL,
  5. > `add_time` datetime DEFAULT NULL,
  6. > PRIMARY KEY (`uid`) NOT ENFORCED
  7. > ) WITH (
  8. > 'connector' = 'mysql-cdc',
  9. > 'hostname' = '192.25.34.2',
  10. > 'port' = '3306',
  11. > 'username' = '*******',
  12. > 'password' = '*******',
  13. > 'database-name' = 'test',
  14. > 'table-name' = 't_user'
  15. > );
  16. [ERROR] Could not execute SQL statement. Reason:
  17. org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 2, column 12.
  18. Was expecting one of:
  19. "CONSTRAINT" ...
  20. "NOT" ...
  21. "NULL" ...
  22. "PRIMARY" ...
  23. "UNIQUE" ...
  24. "COMMENT" ...
  25. "METADATA" ...
  26. ")" ...
  27. "," ...
  28. "MULTISET" ...
  29. "ARRAY" ...
  30. Flink SQL>

问题2:Encountered “AUTO_INCREMENT”
处理方法:删除AUTO_INCREMENT

  1. Flink SQL> CREATE TABLE `t_user` (
  2. > `uid` int NOT NULL AUTO_INCREMENT COMMENT 'user id',
  3. > `did` int DEFAULT NULL COMMENT 'dept id',
  4. > `username` varchar(14) DEFAULT NULL,
  5. > `add_time` datetime DEFAULT NULL,
  6. > PRIMARY KEY (`uid`) NOT ENFORCED
  7. > ) WITH (
  8. > 'connector' = 'mysql-cdc',
  9. > 'hostname' = '192.25.34.2',
  10. > 'port' = '3306',
  11. > 'username' = '*******',
  12. > 'password' = '*******',
  13. > 'database-name' = 'test',
  14. > 'table-name' = 't_user'
  15. > );
  16. [ERROR] Could not execute SQL statement. Reason:
  17. org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2, column 22.
  18. Was expecting one of:
  19. "CONSTRAINT" ...
  20. "PRIMARY" ...
  21. "UNIQUE" ...
  22. "COMMENT" ...
  23. "METADATA" ...
  24. ")" ...
  25. "," ...
  26. "MULTISET" ...
  27. "ARRAY" ...
  28. Flink SQL>

问题3:Encountered “DEFAULT”
处理方法:删去DEFAULT

  1. Flink SQL> CREATE TABLE `t_user` (
  2. > `uid` int NOT NULL COMMENT 'user id',
  3. > `did` int DEFAULT NULL COMMENT 'dept id',
  4. > `username` varchar(14) DEFAULT NULL,
  5. > `add_time` datetime DEFAULT NULL,
  6. > PRIMARY KEY (`uid`) NOT ENFORCED
  7. > ) WITH (
  8. > 'connector' = 'mysql-cdc',
  9. > 'hostname' = '192.25.34.2',
  10. > 'port' = '3306',
  11. > 'username' = '*******',
  12. > 'password' = '*******',
  13. > 'database-name' = 'test',
  14. > 'table-name' = 't_user'
  15. > );
  16. [ERROR] Could not execute SQL statement. Reason:
  17. org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 3, column 13.
  18. Was expecting one of:
  19. "CONSTRAINT" ...
  20. "NOT" ...
  21. "NULL" ...
  22. "PRIMARY" ...
  23. "UNIQUE" ...
  24. "COMMENT" ...
  25. "METADATA" ...
  26. ")" ...
  27. "," ...
  28. "MULTISET" ...
  29. "ARRAY" ...
  30. Flink SQL>

问题4:Unknown identifier ‘datetime
处理方法:改用 TIMESTAMP(3)

  1. Flink SQL> CREATE TABLE `t_user` (
  2. > `uid` int NOT NULL COMMENT 'user id',
  3. > `did` int COMMENT 'dept id',
  4. > `username` varchar(14) ,
  5. > `add_time` datetime ,
  6. > PRIMARY KEY (`uid`) NOT ENFORCED
  7. > ) WITH (
  8. > 'connector' = 'mysql-cdc',
  9. > 'hostname' = '192.25.34.2',
  10. > 'port' = '3306',
  11. > 'username' = '*******',
  12. > 'password' = '*******',
  13. > 'database-name' = 'test',
  14. > 'table-name' = 't_user'
  15. > );
  16. [ERROR] Could not execute SQL statement. Reason:
  17. org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'datetime'
  18. Flink SQL>

创建成功:

  1. Flink SQL> CREATE TABLE `t_user` (
  2. > `uid` int NOT NULL COMMENT 'user id',
  3. > `did` int COMMENT 'dept id',
  4. > `username` varchar(14) ,
  5. > `add_time` TIMESTAMP(3),
  6. > PRIMARY KEY (`uid`) NOT ENFORCED
  7. > ) WITH (
  8. > 'connector' = 'mysql-cdc',
  9. > 'hostname' = '192.25.34.2',
  10. > 'port' = '3306',
  11. > 'username' = '*******',
  12. > 'password' = '*******',
  13. > 'database-name' = 'test',
  14. > 'table-name' = 't_user'
  15. > );
  16. [INFO] Execute statement succeed.
  17. Flink SQL>

3、创建目标表
  1. Flink SQL> CREATE TABLE `ods_t_user` (
  2. > `uid` int NOT NULL COMMENT 'user id',
  3. > `did` int COMMENT 'dept id',
  4. > `username` varchar(14) ,
  5. > `add_time` TIMESTAMP(3),
  6. > PRIMARY KEY (`uid`) NOT ENFORCED
  7. > ) WITH (
  8. > 'connector' = 'jdbc',
  9. > 'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
  10. > 'driver' = 'com.mysql.cj.jdbc.Driver',
  11. > 'username' = '*******',
  12. > 'password' = '*******',
  13. > 'table-name' = 'ods_t_user'
  14. > );
4、将源表加载到目标表

错误1:Connector ‘mysql-cdc’ can only be used as a source. It cannot be used as a sink.

  1. Flink SQL> insert into t_user select * from ods_t_user;
  2. [ERROR] Could not execute SQL statement. Reason:
  3. org.apache.flink.table.api.ValidationException: Connector 'mysql-cdc' can only be used as a source. It cannot be used as a sink.
  4. Flink SQL>

原因:方向搞反了,插入表应该是目标表

  1. Flink SQL> insert into ods_t_user select * from t_user;
  2. [ERROR] Could not execute SQL statement. Reason:
  3. java.io.StreamCorruptedException: unexpected block data
  4. Flink SQL>

错误2:unexpected block data
解决办法:
(1)更新jar包如下

  1. [appuser@whtpjfscpt01 flink-1.17.1]$ ll lib/
  2. total 223320
  3. -rw-r--r-- 1 appuser appuser 196491 May 19 18:56 flink-cep-1.17.1.jar
  4. -rw-r--r-- 1 appuser appuser 542620 May 19 18:59 flink-connector-files-1.17.1.jar
  5. -rw-r--r-- 1 appuser appuser 266420 Sep 25 14:21 flink-connector-jdbc-3.1.1-1.17.jar
  6. -rw-r--r-- 1 appuser appuser 345711 Sep 25 15:45 flink-connector-mysql-cdc-2.4.1.jar
  7. -rw-r--r-- 1 appuser appuser 102472 May 19 19:02 flink-csv-1.17.1.jar
  8. -rw-r--r-- 1 appuser appuser 135975541 May 19 19:13 flink-dist-1.17.1.jar
  9. -rw-r--r-- 1 appuser appuser 8452171 Sep 19 10:20 flink-doris-connector-1.17-1.4.0.jar
  10. -rw-r--r-- 1 appuser appuser 180248 May 19 19:02 flink-json-1.17.1.jar
  11. -rw-r--r-- 1 appuser appuser 21043319 May 19 19:12 flink-scala_2.12-1.17.1.jar
  12. -rw-r--r-- 1 appuser appuser 15407424 May 19 19:13 flink-table-api-java-uber-1.17.1.jar
  13. -rw-r--r-- 1 appuser appuser 38191226 May 19 19:08 flink-table-planner-loader-1.17.1.jar
  14. -rw-r--r-- 1 appuser appuser 3146210 May 19 18:56 flink-table-runtime-1.17.1.jar
  15. -rw-r--r-- 1 appuser appuser 208006 May 17 18:07 log4j-1.2-api-2.17.1.jar
  16. -rw-r--r-- 1 appuser appuser 301872 May 17 18:07 log4j-api-2.17.1.jar
  17. -rw-r--r-- 1 appuser appuser 1790452 May 17 18:07 log4j-core-2.17.1.jar
  18. -rw-r--r-- 1 appuser appuser 24279 May 17 18:07 log4j-slf4j-impl-2.17.1.jar
  19. -rw-r--r-- 1 appuser appuser 2462364 Sep 19 11:30 mysql-connector-java-8.0.26.jar
  20. [appuser@whtpjfscpt01 flink-1.17.1]$

(2)重启flink

  1. [appuser@whtpjfscpt01 flink-1.17.1]$ bin/stop-cluster.sh
  2. Stopping taskexecutor daemon (pid: 41993) on host whtpjfscpt01.
  3. Stopping standalonesession daemon (pid: 41597) on host whtpjfscpt01.
  4. [appuser@whtpjfscpt01 flink-1.17.1]$ bin/start-cluster.sh
  5. Starting cluster.
  6. Starting standalonesession daemon on host whtpjfscpt01.
  7. Starting taskexecutor daemon on host whtpjfscpt01.
  8. [appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

(3)重新执行

  1. Flink SQL> SET execution.checkpointing.interval = 3s;
  2. [INFO] Execute statement succeed.
  3. Flink SQL> CREATE TABLE `t_user` (
  4. > `uid` int NOT NULL COMMENT 'user id',
  5. > `did` int COMMENT 'dept id',
  6. > `username` varchar(14) ,
  7. > `add_time` TIMESTAMP(3),
  8. > PRIMARY KEY (`uid`) NOT ENFORCED
  9. > ) WITH (
  10. > 'connector' = 'mysql-cdc',
  11. > 'hostname' = '192.25.34.2',
  12. > 'port' = '3306',
  13. > 'username' = '*******',
  14. > 'password' = '*******',
  15. > 'database-name' = 'test',
  16. > 'table-name' = 't_user'
  17. > );
  18. [INFO] Execute statement succeed.
  19. Flink SQL> CREATE TABLE `ods_t_user` (
  20. > `uid` int NOT NULL COMMENT 'user id',
  21. > `did` int COMMENT 'dept id',
  22. > `username` varchar(14) ,
  23. > `add_time` TIMESTAMP(3),
  24. > PRIMARY KEY (`uid`) NOT ENFORCED
  25. > ) WITH (
  26. > 'connector' = 'jdbc',
  27. > 'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
  28. > 'driver' = 'com.mysql.cj.jdbc.Driver',
  29. > 'username' = '*******',
  30. > 'password' = '*******',
  31. > 'table-name' = 'ods_t_user'
  32. > );
  33. [INFO] Execute statement succeed.
  34. Flink SQL>

 

(4)成功执行

  1. Flink SQL> insert into ods_t_user select * from t_user;
  2. [INFO] Submitting SQL update statement to the cluster...
  3. [INFO] SQL update statement has been successfully submitted to the cluster:
  4. Job ID: c2e69d061f3777c031b0acb4ec03d13a

在这里插入图片描述

错误3:无目标表
在这里插入图片描述

  1. CREATE TABLE demo.ods_t_user (
  2. `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
  3. `did` int(11) DEFAULT NULL COMMENT 'dept id',
  4. `username` varchar(14) DEFAULT NULL,
  5. `add_time` datetime DEFAULT NULL,
  6. PRIMARY KEY (`uid`)
  7. )

在这里插入图片描述
源表添加新纪录

INSERT INTO test.t_user(did,username)values('3','test'); 

目标表自动同步数据
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/792017
推荐阅读
相关标签
  

闽ICP备14008679号