当前位置:   article > 正文

Flink实战-(6)FlinkSQL实现CDC_could not find any factory for identifier 'postgre

could not find any factory for identifier 'postgres-cdc' that implements 'or

FlinkSQL说明

  • Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
  • 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。
  • Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。
  • 在 flink sql 中,对表名、字段名、函数名等是严格区分大小写的,为了兼容 hive 等其他仓库,建议建表时,表名和字段名都采用下划线连接单词的方式,以避免大小写问题。比如 hive ,是不区分大小写的,所有大写字母最终都会被系统转化为小写字母,此时使用 flink sql 去读写 hive ,出现大写字母时,会出现找不到表或字段的错误。关键字是不区分大小写的,比如 insert、select、create等。flink sql 中所有的字符串常量都需要使用英文单引号括起来,不要使用英文双引号以及中文符号。

前期准备

依赖的环境

  1. 环境:Linux(Centos7)
  2. Flink : 1.13.6

进入Flink的lib目录

cd flink-1.13.6/lib

上传相关的依赖包,这几个包在网上很容易找到

  1. flink-sql-connector-mysql-cdc-2.1.0.jar
  2. mysql-connector-java-8.0.13.jar
  3. flink-sql-connector-postgres-cdc-1.2.0.jar
  4. postgresql-42.6.0.jar

启动 Flink客户端

./flink-1.13.1/bin/sql-client.sh

Flink-SQL脚本

1、postgresql ->postgresql

  1. -- pg中映射表,source
  2. CREATE TABLE cdc_pg_source (
  3. id INT,
  4. age INT,
  5. name STRING
  6. ) WITH (
  7. 'connector' = 'postgres-cdc',
  8. 'hostname' = '10.254.21.3',
  9. 'port' = '54432',
  10. 'database-name' = 'postgres',
  11. 'schema-name' = 'public',
  12. 'username' = 'gpadmin',
  13. 'password' = 'xxxxxxx',
  14. 'table-name' = 'cdc_pg_source',
  15. 'decoding.plugin.name' = 'pgoutput',
  16. 'debezium.slot.name' = 'cdc_pg_source');
  17. -- pg中映射表,sink
  18. CREATE TABLE cdc_pg_sink (
  19. id INT,
  20. age INT,
  21. name STRING,
  22. PRIMARY KEY (id) NOT ENFORCED
  23. ) WITH (
  24. 'connector' = 'jdbc',
  25. 'url' = 'jdbc:postgresql://10.254.21.3:54432/postgres',
  26. 'username' = 'gpadmin',
  27. 'password' = 'xxxxxx',
  28. 'table-name' = 'cdc_pg_sink',
  29. 'sink.buffer-flush.max-rows' = '1');
  30. -- flink job
  31. INSERT INTO cdc_pg_sink select * from cdc_pg_source;

2、mysql -> mysql

  1. CREATE TABLE t_test (
  2. id bigint,
  3. username string,
  4. password string,
  5. create_time time
  6. ) WITH (
  7. 'connector' = 'mysql-cdc',
  8. 'hostname' = '10.252.92.4',
  9. 'port' = '3306',
  10. 'database-name' = 'flink_cdc_test',
  11. 'username' = 'root',
  12. 'password' = 'xxxx',
  13. 'table-name' = 't_test'
  14. );
  15. CREATE TABLE t_test_ods (
  16. id bigint primary key,
  17. username string,
  18. password string,
  19. create_time time
  20. ) WITH (
  21. 'connector' = 'jdbc',
  22. 'url' = 'jdbc:mysql://10.252.92.4:3306/flink_cdc_test_ods',
  23. 'username' = 'root',
  24. 'password' = 'xxxx',
  25. 'table-name' = 't_test',
  26. 'sink.buffer-flush.max-rows' = '1'
  27. );
  28. insert into t_test_ods select * from t_test;

遇到的问题

1、Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.  或  Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

  1. [ERROR] Could not execute SQL statement. Reason:
  2. org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
  3. Available factory identifiers are:
  4. blackhole
  5. datagen
  6. filesystem
  7. print
  1. [ERROR] Could not execute SQL statement. Reason:
  2. org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
  3. Available factory identifiers are:
  4. blackhole
  5. datagen
  6. filesystem
  7. print

解决方法:

  1. <dependency>
  2. <groupId>com.alibaba.ververica</groupId>
  3. <artifactId>flink-connector-postgres-cdc</artifactId>
  4. <version>1.4.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba.ververica</groupId>
  8. <artifactId>flink-connector-mysql-cdc</artifactId>
  9. <version>1.4.0</version>
  10. </dependency>

下载改JAR包,把它加到Flink下的lib路径下,然后重启sql-client;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/811557
推荐阅读
相关标签
  

闽ICP备14008679号