赞
踩
方式一 通过JDBCOutputFormat
在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。
JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。
JDBCOutputFormat用起来很简单,只需要一个prepared statement,driver和database connection,就可以开始使用了。
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
.setQuery(query)
.finish();
如下的sql语句可以作为prepared statement:
String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";
对应的表的结构:
CREATE TABLE cases
(
caseid VARCHAR(255),
tracehash VARCHAR(255)
);
但有一点要明确,JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类。这意味着我们需要将流中的case转换为row,通过map就能做的。
DataStream cases = ...
DataStream rows = cases.map((MapFunction) aCase -> {
Row row = new Row(2); // our prepared statement has 2 parameters
row.setField(0, aCase.getId()); //first parameter is case ID
row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
return row;
});
这样,我们就能添加sink了:
rows.writeUsingOutputFormat(jdbcOutput);
这样,你就可以将数据写入mysql了。
但是在你在流上附加了窗口之后,可能会得到下面的报错:
"Unknown column type for column %s. Best effort approach to set its value: %s."
因为窗口处理的类型,没有明确的类型定义,如下修改之前的定义,显式的指定类型:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。