赞
踩
目前跑通的读写MySQL的方式有三种,一种是直接使用flink自带的JDBCInputFormat和JDBCOutputFormat,一种是自定义source和sink,最后一种是通过DDL连接MySQL进行读写(但是这种只在idea调试通了,打包上传后运行报错,因此比较的时候只比较前两种)。
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.0</version>
</dependency>
public class ReadWriteMysqlByJDBC {
public static void main(String[] args) throws Exception {
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
//需要与获取的字段一一对应,否则会取不到值
TypeInformation[] fieldTypes = new TypeInformation[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
//读mysql
DataSet<Row> dataSource = fbEnv.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://xxx/xxx")
.setUsername("xxx")
.setPassword("xxx")
.setQuery("xxx")
.setRowTypeInfo(rowTypeInfo)
.finish());
//写MySQL
dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://xxx/xxx")
.setUsername("xxx")
.setPassword("xxx")
.setQuery("xxx")
.finish());
fbEnv.execute();
}
}
设置并行度为2,show plan如下:
public class MysqlSource extends RichSourceFunction<SourceVo> {
private static final Logger logger = LoggerFactory.getLogger(MysqlSource.class);
private Connection connection = null;
private PreparedStatement ps = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
connection = DriverManager.getConnection("jdbc:mysql://xxx/xxx", "xxx", "xxx");//获取连接
ps = connection.prepareStatement("xxx");
}
@Override
public void run(SourceContext<SourceVo> ctx) throws Exception {
try {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
SourceVo vo = new SourceVo();
vo.setxxx(resultSet.getString("xxx"));
ctx.collect(vo);
}
} catch (Exception e) {
logger.error("runException:{}", e);
}
}
@Override
public void cancel() {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
logger.error("runException:{}", e);
}
}
}
public class MysqlSink extends RichSinkFunction<SourceVo> {
private Connection connection;
private PreparedStatement preparedStatement;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 加载JDBC驱动
Class.forName("com.mysql.jdbc.Driver");
// 获取数据库连接
connection = DriverManager.getConnection("jdbc:mysql://xxx/xxx", "xxx", "xxx");//获取连接
preparedStatement = connection.prepareStatement("xxx");
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
if(preparedStatement != null){
preparedStatement.close();
}
if(connection != null){
connection.close();
}
super.close();
}
@Override
public void invoke(SourceVo value, Context context) throws Exception {
try {
preparedStatement.setString(1,value.getxxx());
preparedStatement.executeUpdate();
}catch (Exception e){
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SourceVo> source = fsEnv.addSource(new MysqlSource());
source.addSink(new MysqlSink());
fsEnv.execute();
}
设置并行度为2,show plan如下:
public class ReadWriteMysqlByDDL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv,fsSettings);
String sourceTable ="CREATE TABLE sourceTable (\n" +
" FTableName VARCHAR,\n" +
" FECName VARCHAR\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc', -- 使用 jdbc connector\n" +
" 'connector.url' = 'jdbc:mysql://xxx/xxx', -- jdbc url\n" +
" 'connector.table' = 'xxx', -- 表名\n" +
" 'connector.username' = 'xxx', -- 用户名\n" +
" 'connector.password' = 'xxx', -- 密码\n" +
" 'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条\n" +
")";
tableEnvironment.sqlUpdate(sourceTable);
String sinkTable ="CREATE TABLE sinkTable (\n" +
" FID VARCHAR,\n"+
" FRoomName VARCHAR\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc', -- 使用 jdbc connector\n" +
" 'connector.url' = 'jdbc:mysql://xxx/xxx', -- jdbc url\n" +
" 'connector.table' = 'xxx', -- 表名\n" +
" 'connector.username' = 'xxx', -- 用户名\n" +
" 'connector.password' = 'xxx, -- 密码\n" +
" 'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为100条\n" +
")";
tableEnvironment.sqlUpdate(sinkTable);
String query = "SELECT FTableName as tableName,FECName as ecName FROM sourceTable";
Table table = tableEnvironment.sqlQuery(query);
table.filter("tableName === 'xxx'").select("'1',ecName").insertInto("sinkTable");
streamEnv.execute();
}
}
奇怪的是这种方式打包上传show plan的时候报错:
2020-05-22 12:09:48,198 WARN org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2020-05-22 12:09:48,201 WARN org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2020-05-22 12:09:48,201 WARN org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2020-05-22 12:09:48,372 ERROR org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. findAndCreateTableSource failed.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:100)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.
at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124)
at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at connector.mysql.ReadWriteMysqlByDDL.main(ReadWriteMysqlByDDL.java:44)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 10 more
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138)
at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97)
at org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86)
at java.util.Optional.map(Optional.java:215)
at org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76)
at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1008)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:968)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3122)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3104)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3376)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1008)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:968)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:650)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:126)
... 20 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.password=xxx
connector.table=xxx
connector.type=jdbc
connector.url=jdbc:mysql://xxx/xxx
connector.username=xxx
connector.write.flush.max-rows=100
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=FTableName
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=FECName
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 47 more
暂时不知道是什么原因
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。