赞
踩
离线分析一直用的Spark,而且感觉很不错的。不过现在实时计算Flink显然比Spark更具有优势,而且Flink也支持离线分析,虽然还没有Spark这么强大,但是相信不远的将来完全在离线分析方面也有能力与其抗衡,因此测试了一下Flink 读取 Mysql
package dataset import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.io.jdbc.JDBCInputFormat import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala.BatchTableEnvironment import org.apache.flink.types.Row object Flink2Mysql { def main(args: Array[String]): Unit = { //设定执行环境 val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = BatchTableEnvironment.create(env) //通过创建JDBCInputFormat读取JDBC数据源 val jdbcDataSet: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/test") .setUsername("root") .setPassword("123") .setQuery("select * from app_als") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo. DOUBLE_TYPE_INFO)) .finish() ) //将DataSet注册为表 tEnv.registerDataSet("tb", jdbcDataSet) //执行查询操作 val table = tEnv.sqlQuery("select * from tb") //把table转为DataSet tEnv.toDataSet[Row](table).print() } }
1,电视机,2999.0
2,电视机,2999.0
3,洗衣机,1000.0
4,洗衣机,1000.0
5,洗衣机,1000.0
6,冰箱,3999.0
7,冰箱,3999.0
8,空调,1999.0
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。