当前位置:   article > 正文

Flink SQL --JDBC connector_flink-connector-jdbc

flink-connector-jdbc


基于 Flink-1.12

一、Table API & SQL

二、JDBC connector

2.1、配置

1、添加依赖jar

flink-connector-jdbc_2.11-1.12.3.jar
mysql-connector-java-5.1.48.jar
  • 1
  • 2

2、重启flink

stop-cluster.sh 
start-cluster.sh

  • 1
  • 2
  • 3

注意: 如果不重启的话,无法加载到上面的依赖jar
java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
或者

Flink SQL> select * from t_sum;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3、启动SQL Client

./sql-client.sh embedded
  • 1

2.2、创建表

2.2.1、MySQL中创建表

create table t_sum(
 cnt int
);
  • 1
  • 2
  • 3

2.2.2、Flink SQL Client创建表


create table t_sum(
 cnt int
)
with(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://chb1:3306/flinktest',
'table-name' = 't_sum',
'username' = 'root',
'password' = '123456'
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

2.3、测试

2.3.1、在Flink SQL Client插入数据


Flink SQL> insert into t_sum values (10);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 7cc4b890a7cb551dd31d38d0d4613c60

Flink SQL> 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

在这里插入图片描述

查看MySQL中是否插入数据
在这里插入图片描述

2.3.1、在MySQL插入数据

mysql> insert into t_sum value(20);
Query OK, 1 row affected (0.00 sec)

mysql> 
  • 1
  • 2
  • 3
  • 4

Flink SQL Client中也更新了
在这里插入图片描述

2.4、由于使用的是默认catalog,没有持久存储,所以 Flink SQL Client退出重新登录,数据没了

在这里插入图片描述

参考

JDBC SQL Connector

关注我的公众号【宝哥大数据】, 更多干货。。。

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/396707
推荐阅读
相关标签
  

闽ICP备14008679号