赞
踩
依赖的环境
- 环境:Linux(Centos7)
- Flink : 1.13.6
进入Flink的lib目录
cd flink-1.13.6/lib
上传相关的依赖包,这几个包在网上很容易找到
- flink-sql-connector-mysql-cdc-2.1.0.jar
- mysql-connector-java-8.0.13.jar
- flink-sql-connector-postgres-cdc-1.2.0.jar
- postgresql-42.6.0.jar
启动 Flink客户端
./flink-1.13.1/bin/sql-client.sh
1、postgresql ->postgresql
- -- pg中映射表,source
- CREATE TABLE cdc_pg_source (
- id INT,
- age INT,
- name STRING
- ) WITH (
- 'connector' = 'postgres-cdc',
- 'hostname' = '10.254.21.3',
- 'port' = '54432',
- 'database-name' = 'postgres',
- 'schema-name' = 'public',
- 'username' = 'gpadmin',
- 'password' = 'xxxxxxx',
- 'table-name' = 'cdc_pg_source',
- 'decoding.plugin.name' = 'pgoutput',
- 'debezium.slot.name' = 'cdc_pg_source');
-
-
-
-
- -- pg中映射表,sink
- CREATE TABLE cdc_pg_sink (
- id INT,
- age INT,
- name STRING,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:postgresql://10.254.21.3:54432/postgres',
- 'username' = 'gpadmin',
- 'password' = 'xxxxxx',
- 'table-name' = 'cdc_pg_sink',
- 'sink.buffer-flush.max-rows' = '1');
-
-
-
- -- flink job
- INSERT INTO cdc_pg_sink select * from cdc_pg_source;
2、mysql -> mysql
- CREATE TABLE t_test (
- id bigint,
- username string,
- password string,
- create_time time
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = '10.252.92.4',
- 'port' = '3306',
- 'database-name' = 'flink_cdc_test',
- 'username' = 'root',
- 'password' = 'xxxx',
- 'table-name' = 't_test'
- );
-
- CREATE TABLE t_test_ods (
- id bigint primary key,
- username string,
- password string,
- create_time time
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://10.252.92.4:3306/flink_cdc_test_ods',
- 'username' = 'root',
- 'password' = 'xxxx',
- 'table-name' = 't_test',
- 'sink.buffer-flush.max-rows' = '1'
- );
-
- insert into t_test_ods select * from t_test;
1、Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. 或 Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
- [ERROR] Could not execute SQL statement. Reason:
- org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
-
- Available factory identifiers are:
-
- blackhole
- datagen
- filesystem
- print
- [ERROR] Could not execute SQL statement. Reason:
- org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
-
- Available factory identifiers are:
-
- blackhole
- datagen
- filesystem
- print
解决方法:
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-connector-postgres-cdc</artifactId>
- <version>1.4.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>1.4.0</version>
- </dependency>
下载改JAR包,把它加到Flink下的lib路径下,然后重启sql-client;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。