赞
踩
Central Repository: org/apache/flink/flink-connector-jdbc/3.1.1-1.16
Central Repository: com/ververica/flink-sql-connector-mysql-cdc/3.0.0
或者从mvnrepository.com下载
https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh
问题1:Encountered “(”
处理方法:去掉int(11)
,改为int
- Flink SQL> CREATE TABLE `t_user` (
- > `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
- > `did` int(11) DEFAULT NULL COMMENT 'dept id',
- > `username` varchar(14) DEFAULT NULL,
- > `add_time` datetime DEFAULT NULL,
- > PRIMARY KEY (`uid`) NOT ENFORCED
- > ) WITH (
- > 'connector' = 'mysql-cdc',
- > 'hostname' = '192.25.34.2',
- > 'port' = '3306',
- > 'username' = '*******',
- > 'password' = '*******',
- > 'database-name' = 'test',
- > 'table-name' = 't_user'
- > );
- [ERROR] Could not execute SQL statement. Reason:
- org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 2, column 12.
- Was expecting one of:
- "CONSTRAINT" ...
- "NOT" ...
- "NULL" ...
- "PRIMARY" ...
- "UNIQUE" ...
- "COMMENT" ...
- "METADATA" ...
- ")" ...
- "," ...
- "MULTISET" ...
- "ARRAY" ...
-
- Flink SQL>
问题2:Encountered “AUTO_INCREMENT”
处理方法:删除AUTO_INCREMENT
- Flink SQL> CREATE TABLE `t_user` (
- > `uid` int NOT NULL AUTO_INCREMENT COMMENT 'user id',
- > `did` int DEFAULT NULL COMMENT 'dept id',
- > `username` varchar(14) DEFAULT NULL,
- > `add_time` datetime DEFAULT NULL,
- > PRIMARY KEY (`uid`) NOT ENFORCED
- > ) WITH (
- > 'connector' = 'mysql-cdc',
- > 'hostname' = '192.25.34.2',
- > 'port' = '3306',
- > 'username' = '*******',
- > 'password' = '*******',
- > 'database-name' = 'test',
- > 'table-name' = 't_user'
- > );
- [ERROR] Could not execute SQL statement. Reason:
- org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2, column 22.
- Was expecting one of:
- "CONSTRAINT" ...
- "PRIMARY" ...
- "UNIQUE" ...
- "COMMENT" ...
- "METADATA" ...
- ")" ...
- "," ...
- "MULTISET" ...
- "ARRAY" ...
-
- Flink SQL>
问题3:Encountered “DEFAULT”
处理方法:删去DEFAULT
- Flink SQL> CREATE TABLE `t_user` (
- > `uid` int NOT NULL COMMENT 'user id',
- > `did` int DEFAULT NULL COMMENT 'dept id',
- > `username` varchar(14) DEFAULT NULL,
- > `add_time` datetime DEFAULT NULL,
- > PRIMARY KEY (`uid`) NOT ENFORCED
- > ) WITH (
- > 'connector' = 'mysql-cdc',
- > 'hostname' = '192.25.34.2',
- > 'port' = '3306',
- > 'username' = '*******',
- > 'password' = '*******',
- > 'database-name' = 'test',
- > 'table-name' = 't_user'
- > );
- [ERROR] Could not execute SQL statement. Reason:
- org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 3, column 13.
- Was expecting one of:
- "CONSTRAINT" ...
- "NOT" ...
- "NULL" ...
- "PRIMARY" ...
- "UNIQUE" ...
- "COMMENT" ...
- "METADATA" ...
- ")" ...
- "," ...
- "MULTISET" ...
- "ARRAY" ...
-
- Flink SQL>
问题4:Unknown identifier ‘datetime’
处理方法:改用 TIMESTAMP(3)
- Flink SQL> CREATE TABLE `t_user` (
- > `uid` int NOT NULL COMMENT 'user id',
- > `did` int COMMENT 'dept id',
- > `username` varchar(14) ,
- > `add_time` datetime ,
- > PRIMARY KEY (`uid`) NOT ENFORCED
- > ) WITH (
- > 'connector' = 'mysql-cdc',
- > 'hostname' = '192.25.34.2',
- > 'port' = '3306',
- > 'username' = '*******',
- > 'password' = '*******',
- > 'database-name' = 'test',
- > 'table-name' = 't_user'
- > );
- [ERROR] Could not execute SQL statement. Reason:
- org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'datetime'
-
- Flink SQL>
创建成功:
- Flink SQL> CREATE TABLE `t_user` (
- > `uid` int NOT NULL COMMENT 'user id',
- > `did` int COMMENT 'dept id',
- > `username` varchar(14) ,
- > `add_time` TIMESTAMP(3),
- > PRIMARY KEY (`uid`) NOT ENFORCED
- > ) WITH (
- > 'connector' = 'mysql-cdc',
- > 'hostname' = '192.25.34.2',
- > 'port' = '3306',
- > 'username' = '*******',
- > 'password' = '*******',
- > 'database-name' = 'test',
- > 'table-name' = 't_user'
- > );
- [INFO] Execute statement succeed.
-
- Flink SQL>
- Flink SQL> CREATE TABLE `ods_t_user` (
- > `uid` int NOT NULL COMMENT 'user id',
- > `did` int COMMENT 'dept id',
- > `username` varchar(14) ,
- > `add_time` TIMESTAMP(3),
- > PRIMARY KEY (`uid`) NOT ENFORCED
- > ) WITH (
- > 'connector' = 'jdbc',
- > 'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
- > 'driver' = 'com.mysql.cj.jdbc.Driver',
- > 'username' = '*******',
- > 'password' = '*******',
- > 'table-name' = 'ods_t_user'
- > );
错误1:Connector ‘mysql-cdc’ can only be used as a source. It cannot be used as a sink.
- Flink SQL> insert into t_user select * from ods_t_user;
- [ERROR] Could not execute SQL statement. Reason:
- org.apache.flink.table.api.ValidationException: Connector 'mysql-cdc' can only be used as a source. It cannot be used as a sink.
-
- Flink SQL>
原因:方向搞反了,插入表应该是目标表
- Flink SQL> insert into ods_t_user select * from t_user;
- [ERROR] Could not execute SQL statement. Reason:
- java.io.StreamCorruptedException: unexpected block data
-
- Flink SQL>
错误2:unexpected block data
解决办法:
(1)更新jar包如下
- [appuser@whtpjfscpt01 flink-1.17.1]$ ll lib/
- total 223320
- -rw-r--r-- 1 appuser appuser 196491 May 19 18:56 flink-cep-1.17.1.jar
- -rw-r--r-- 1 appuser appuser 542620 May 19 18:59 flink-connector-files-1.17.1.jar
- -rw-r--r-- 1 appuser appuser 266420 Sep 25 14:21 flink-connector-jdbc-3.1.1-1.17.jar
- -rw-r--r-- 1 appuser appuser 345711 Sep 25 15:45 flink-connector-mysql-cdc-2.4.1.jar
- -rw-r--r-- 1 appuser appuser 102472 May 19 19:02 flink-csv-1.17.1.jar
- -rw-r--r-- 1 appuser appuser 135975541 May 19 19:13 flink-dist-1.17.1.jar
- -rw-r--r-- 1 appuser appuser 8452171 Sep 19 10:20 flink-doris-connector-1.17-1.4.0.jar
- -rw-r--r-- 1 appuser appuser 180248 May 19 19:02 flink-json-1.17.1.jar
- -rw-r--r-- 1 appuser appuser 21043319 May 19 19:12 flink-scala_2.12-1.17.1.jar
- -rw-r--r-- 1 appuser appuser 15407424 May 19 19:13 flink-table-api-java-uber-1.17.1.jar
- -rw-r--r-- 1 appuser appuser 38191226 May 19 19:08 flink-table-planner-loader-1.17.1.jar
- -rw-r--r-- 1 appuser appuser 3146210 May 19 18:56 flink-table-runtime-1.17.1.jar
- -rw-r--r-- 1 appuser appuser 208006 May 17 18:07 log4j-1.2-api-2.17.1.jar
- -rw-r--r-- 1 appuser appuser 301872 May 17 18:07 log4j-api-2.17.1.jar
- -rw-r--r-- 1 appuser appuser 1790452 May 17 18:07 log4j-core-2.17.1.jar
- -rw-r--r-- 1 appuser appuser 24279 May 17 18:07 log4j-slf4j-impl-2.17.1.jar
- -rw-r--r-- 1 appuser appuser 2462364 Sep 19 11:30 mysql-connector-java-8.0.26.jar
- [appuser@whtpjfscpt01 flink-1.17.1]$
(2)重启flink
- [appuser@whtpjfscpt01 flink-1.17.1]$ bin/stop-cluster.sh
- Stopping taskexecutor daemon (pid: 41993) on host whtpjfscpt01.
- Stopping standalonesession daemon (pid: 41597) on host whtpjfscpt01.
- [appuser@whtpjfscpt01 flink-1.17.1]$ bin/start-cluster.sh
- Starting cluster.
- Starting standalonesession daemon on host whtpjfscpt01.
- Starting taskexecutor daemon on host whtpjfscpt01.
- [appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh
(3)重新执行
- Flink SQL> SET execution.checkpointing.interval = 3s;
- [INFO] Execute statement succeed.
-
- Flink SQL> CREATE TABLE `t_user` (
- > `uid` int NOT NULL COMMENT 'user id',
- > `did` int COMMENT 'dept id',
- > `username` varchar(14) ,
- > `add_time` TIMESTAMP(3),
- > PRIMARY KEY (`uid`) NOT ENFORCED
- > ) WITH (
- > 'connector' = 'mysql-cdc',
- > 'hostname' = '192.25.34.2',
- > 'port' = '3306',
- > 'username' = '*******',
- > 'password' = '*******',
- > 'database-name' = 'test',
- > 'table-name' = 't_user'
- > );
- [INFO] Execute statement succeed.
-
- Flink SQL> CREATE TABLE `ods_t_user` (
- > `uid` int NOT NULL COMMENT 'user id',
- > `did` int COMMENT 'dept id',
- > `username` varchar(14) ,
- > `add_time` TIMESTAMP(3),
- > PRIMARY KEY (`uid`) NOT ENFORCED
- > ) WITH (
- > 'connector' = 'jdbc',
- > 'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
- > 'driver' = 'com.mysql.cj.jdbc.Driver',
- > 'username' = '*******',
- > 'password' = '*******',
- > 'table-name' = 'ods_t_user'
- > );
- [INFO] Execute statement succeed.
-
- Flink SQL>
(4)成功执行
- Flink SQL> insert into ods_t_user select * from t_user;
- [INFO] Submitting SQL update statement to the cluster...
- [INFO] SQL update statement has been successfully submitted to the cluster:
- Job ID: c2e69d061f3777c031b0acb4ec03d13a
错误3:无目标表
- CREATE TABLE demo.ods_t_user (
- `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
- `did` int(11) DEFAULT NULL COMMENT 'dept id',
- `username` varchar(14) DEFAULT NULL,
- `add_time` datetime DEFAULT NULL,
- PRIMARY KEY (`uid`)
- )
源表添加新纪录
INSERT INTO test.t_user(did,username)values('3','test');
目标表自动同步数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。