当前位置:   article > 正文

flink写入 mysql_flink写入mysql的两种方式

flink写入mysql

方式一 通过JDBCOutputFormat

在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。

JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。

JDBCOutputFormat用起来很简单,只需要一个prepared statement,driver和database connection,就可以开始使用了。

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()

.setDrivername("com.mysql.jdbc.Driver")

.setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")

.setQuery(query)

.finish();

如下的sql语句可以作为prepared statement:

String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";

对应的表的结构:

CREATE TABLE cases

(

caseid VARCHAR(255),

tracehash VARCHAR(255)

);

但有一点要明确,JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类。这意味着我们需要将流中的case转换为row,通过map就能做的。

DataStream cases = ...

DataStream rows = cases.map((MapFunction) aCase -> {

Row row = new Row(2); // our prepared statement has 2 parameters

row.setField(0, aCase.getId()); //first parameter is case ID

row.setField(1, aCase.getTraceHash()); //second paramater is tracehash

return row;

});

这样,我们就能添加sink了:

rows.writeUsingOutputFormat(jdbcOutput);

这样,你就可以将数据写入mysql了。

但是在你在流上附加了窗口之后,可能会得到下面的报错:

"Unknown column type for column %s. Best effort approach to set its value: %s."

因为窗口处理的类型,没有明确的类型定义,如下修改之前的定义,显式的指定类型:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/598304
推荐阅读
相关标签
  

闽ICP备14008679号