当前位置:   article > 正文

SparkSQL(6)——Spark SQL JDBC_org.apache.spark.sql.jdbc

org.apache.spark.sql.jdbc

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame。
通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

SparkSQL从MySQL中加载数据

package com.fgm.sparksql

import java.util.Properties

import org.apache.spark.sql.SparkSession

/**
  *通过sparksql读取mysql表中的数据
  *
  * @Auther: fgm
  */
object DataFromMysql {
  def main(args: Array[String]): Unit = {
    //创建对象
    val spark = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
    //通过sparkSession对象加载mysql中的数据
    val url="jdbc:mysql://localhost:3306/spark"
    //定义表名
    val table="test"
    //properties
    val properties=new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123")
    val jdbc = spark.read.jdbc(url,table,properties)

    jdbc.printSchema()
    jdbc.show()
    jdbc.createTempView("test")
    spark.sql("select * from test").show()
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

SparkSQL向MySQL中写入数据

package com.fgm.sparksql

import java.util.Properties
import org.apache.spark.sql.SparkSession

/**
  *通过sparksql把结果数据写入到mysql表
  * @Auther: fgm
  */
case class User(val id:Int,val name:String,val age:Int)

object DataToMysql {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("DataToMysql").master("local[2]").getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    //读取数据文件
    val RDD1 = sc.textFile("D:\\tmp\\user.txt").map(_.split(" "))
    //将RDD与样例类关联
    val userRDD = RDD1.map(x=>User(x(0).toInt,x(1),x(2).toInt))
    //构建DataFrame
    import spark.implicits._
    val df = userRDD.toDF()
    df.printSchema()
    df.show()

    df.createTempView("user")
    val result = spark.sql("select * from user where age >30")
    //定义表名
    val table="user"
    //将结果写入到mysql
    //定义数据库url
    val url="jdbc:mysql://localhost:3306/spark"
    //properties
    val properties=new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123")

    result.write.mode("append").jdbc(url,table,properties)
  
    //再将数据库中的数据读取出来,检查是否写入成功,也可以进行其他相关操作
    //val jdbc=spark.read.jdbc(url,table,properties)
    //jdbc.show()
    
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

D:\tmp\user.txt

1 zhangsan 20
2 lisi 29
3 wangwu 33
4 zhaoliu 30
5 hahaha 44
  • 1
  • 2
  • 3
  • 4
  • 5

未注释读取的代码时,数据如下:
在这里插入图片描述

并且查看数据库发现,新建的user表中已经有了数据。
在这里插入图片描述

注意:以上代码,都可以打成jar包之后在集群中运行。参数(如:文件url,以及table等,)可以通过args(0)等方式传入,不要写死在代码里。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/593315
推荐阅读
相关标签
  

闽ICP备14008679号