当前位置:   article > 正文

SparkSQL的JDBC数据源_spark 原生表类型 org.apache.spark.sql.jdbc

spark 原生表类型 org.apache.spark.sql.jdbc

通过JDBC连接到关系型数据库,然后可以读取表的信息以及表中的数据,既可以将结果查询展示出来,也可以将查询到的结果重新写入到数据库中

直接上代码:

  1. package cn.ysjh0014.SparkSql
  2. import java.util.Properties
  3. import org.apache.spark.sql._
  4. object SparkSqlJdbc {
  5. def main(args: Array[String]): Unit = {
  6. val session: SparkSession = SparkSession.builder().appName("SparkSqlJdbc").master("local[*]").getOrCreate()
  7. import session.implicits._
  8. val resource: DataFrame = session.read.format("jdbc").options(
  9. Map("url" -> "jdbc:mysql://localhost:3306/lianxi?serverTimezone=GMT%2B8",
  10. "driver" -> "com.mysql.jdbc.Driver",
  11. "dbtable" -> "table1",
  12. "user" -> "root",
  13. "password" -> "root"
  14. )).load()
  15. // resource.printSchema() //打印出表的表头信息
  16. // resource.show()
  17. // val filterd: Dataset[Row] = resource.filter(r => {
  18. // r.getAs[Int](1) <= 15
  19. // })
  20. //
  21. // filterd.show()
  22. //lambda表达式
  23. val r: Dataset[Row] = resource.filter($"age" <= 15)
  24. // val r: Dataset[Row] = resource.where($"age" <= 15)
  25. val s: DataFrame = r.select($"id",$"name",$"age")
  26. //将查询到的数据再写入到数据库中,ignore是如果表存在不做任何操作,如果不存在则创建表并写入数据,append是在表中追加数据,overwrite是写覆盖数据
  27. val props = new Properties()
  28. props.put("user","root")
  29. props.put("password","root")
  30. s.write.mode("ignore").jdbc("jdbc:mysql://localhost:3306/bigdata?serverTimezone=GMT%2B8", "logs", props)
  31. // r.show()
  32. // s.show()
  33. session.close()
  34. }
  35. }

如上边的代码所示:

使用SparkSQL连接到Mysql数据库后既可以查询到表的表头信息,也可以查询到表中的数据,查询表中数据时可以使用lambda表达式,比较简洁,还可以将查询到的数据写入到数据库中

 

注意:

连接数据库时可能会报时区错误,这是因为数据库的时区不对,可以再jdbc中添加 ?serverTimezone=GMT%2B8 即可,例如我上边中的代码 "jdbc:mysql://localhost:3306/lianxi?serverTimezone=GMT%2B8" 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/593342
推荐阅读
相关标签
  

闽ICP备14008679号