当前位置:   article > 正文

Flink 读取 Mysql_flink如何用jdbcinputformat 读取mysql表

flink如何用jdbcinputformat 读取mysql表

前言

离线分析一直用的Spark,而且感觉很不错的。不过现在实时计算Flink显然比Spark更具有优势,而且Flink也支持离线分析,虽然还没有Spark这么强大,但是相信不远的将来完全在离线分析方面也有能力与其抗衡,因此测试了一下Flink 读取 Mysql

代码

package dataset

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row

object Flink2Mysql {
  def main(args: Array[String]): Unit = {
    //设定执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = BatchTableEnvironment.create(env)

    //通过创建JDBCInputFormat读取JDBC数据源
    val jdbcDataSet: DataSet[Row] =
      env.createInput(JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername("com.mysql.jdbc.Driver")
        .setDBUrl("jdbc:mysql://localhost:3306/test")
        .setUsername("root")
        .setPassword("123")
        .setQuery("select * from app_als")
    	.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO,
          BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.
          DOUBLE_TYPE_INFO))
        .finish()
      )

    //将DataSet注册为表
    tEnv.registerDataSet("tb", jdbcDataSet)
    //执行查询操作
    val table = tEnv.sqlQuery("select * from tb")
    //把table转为DataSet
    tEnv.toDataSet[Row](table).print()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

结果

1,电视机,2999.0
2,电视机,2999.0
3,洗衣机,1000.0
4,洗衣机,1000.0
5,洗衣机,1000.0
6,冰箱,3999.0
7,冰箱,3999.0
8,空调,1999.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

后记

参考官网

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/1005197
推荐阅读
相关标签
  

闽ICP备14008679号