赞
踩
Flink-1.12
1、添加依赖jar
flink-connector-jdbc_2.11-1.12.3.jar
mysql-connector-java-5.1.48.jar
2、重启flink
stop-cluster.sh
start-cluster.sh
注意: 如果不重启的话,无法加载到上面的依赖jar
java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat
或者
Flink SQL> select * from t_sum;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
3、启动SQL Client
./sql-client.sh embedded
MySQL
中创建表create table t_sum(
cnt int
);
Flink SQL Client
创建表
create table t_sum(
cnt int
)
with(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://chb1:3306/flinktest',
'table-name' = 't_sum',
'username' = 'root',
'password' = '123456'
);
Flink SQL Client
插入数据
Flink SQL> insert into t_sum values (10);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 7cc4b890a7cb551dd31d38d0d4613c60
Flink SQL>
查看MySQL
中是否插入数据
MySQL
插入数据mysql> insert into t_sum value(20);
Query OK, 1 row affected (0.00 sec)
mysql>
Flink SQL Client
中也更新了
catalog
,没有持久存储,所以 Flink SQL Client
退出重新登录,数据没了Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。