赞
踩
Flink操作数据库并没有像Spark那样简洁方便,有点类似java操作mysql的思路。
pom依赖
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.9.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> </dependencies>
读写Mysql代码
package FlinkMysql import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.types.Row object FlinkConnectMysql { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment //连接Mysql的参数 val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/selftest" val user = "root" val password = "123456" val sqlRead = "select * from t_offset" val sqlWrite = "insert into ttt(id,types) values(?,?)" //调用读取数据方法 readMysql(env,url,driver,user,password,sqlRead) //要插入Mysql的数据 val value: DataSet[Row] = env.fromElements((11, "A,B,C,D,E.F.G")) .map(t => { val row = new Row(2) row.setField(0, t._1) row.setField(1, t._2) row }) //调用写入Mysql的方法 writeMysql(env,value,driver,url,user,password,sqlWrite) env.execute("mysql job") } def readMysql(env: ExecutionEnvironment, url: String, driver: String, user: String, pwd: String, sql: String) = { val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) .finish()) dataResult.print() } def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row],driver:String, url: String, user: String, pwd: String, sql: String) = { outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .finish()) env.execute("insert into mysql job") print("data write successfully") } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。