当前位置:   article > 正文

Spark Sql(DataFrame 创建与保存、Mysql 数据库的读取与写入、Hive 数据仓库的读取与写入)_dataframe写入数据库hive

dataframe写入数据库hive

一、DataFrame 创建与保存

package sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

case class Person(name: String, age: Long)

object CreateAndSaveDataFrame {
    def main(args: Array[String]): Unit ={
        val spark = SparkSession.builder().getOrCreate()  // SparkSession 对象
        import spark.implicits._  // 使支持 RDDs 转换为DataFrames及后续sql操作
        // ### 创建 DataFrame ###
        // 1、读取 json 数据创建 DataFrame
        println("---------------------")
        val dfA = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
        dfA.show()
        dfA.printSchema()
        dfA.select(dfA("name")).show()  // 选择列
        dfA.filter(dfA("age") > 20).show()  // 条件过滤
        dfA.groupBy("age").count().show()  // 分组聚合
        dfA.sort(dfA("age").desc).show()  // 排序
        dfA.sort(dfA("age").desc, dfA("name").asc)  // 多列排序
        dfA.select(dfA("name").as("username"), dfA("age")).show()
        // 2、RDD 转换创建 DataFrame
        println("-------------------")
        // 2.1 利用反射机制推断 RDD schema 从而创建 df

        // case class Person(name: String, age: Long) =====> error possible cause: maybe a semicolon is missing before `value toDF'?:because "spark.implicits._"
        val dfB = spark.sparkContext.textFile(
            "file:///usr/local/spark/examples/src/main/resources/people.txt"
        ).map(_.split(",")).map(
            attributes => Person(attributes(0), attributes(1).trim.toInt)
        ).toDF()  // if have not "spark.implicits._", "toDF()" and "createOrReplaceTempView()" will can't be use
        dfB.createOrReplaceTempView("people")  // 注册为临时表
        val dfC = spark.sql("select name,age from people where age > 20")
        dfC.map(r => "Name:"+r(0)+","+"Age:"+r(1)).show()
        // 2.2 构造 schema 并应用在 RDD 从而创建 df
        val schema_str = "name age"
        val fields = schema_str.split(" ").map(
            field => StructField(field, StringType, nullable = true)
        )
        val schema = StructType(fields)
        val rdd = spark.sparkContext.textFile(
            "file:///usr/local/spark/examples/src/main/resources/people.txt"
        ).map(_.split(",")).map(
            attributes => Row(attributes(0),attributes(1).trim)
        )  // org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] // Row对象只是对基本数据类型(比如整型或字符串)的数组的封装
        val dfD = spark.createDataFrame(rdd, schema)
        dfD.createOrReplaceTempView("people")
        val dfD_ = spark.sql("select name,age from people")
        dfD_.show()
        // 3. 读取 parquet 文件创建 DataFrame
        val dfE = spark.read.parquet("file:///usr/local/spark/examples/src/main/resources/users.parquet")

        // ### 保存 DataFrame ###
        // Method 1
        dfA.select("name", "age").write.format("csv").save(
            "file:///usr/local/spark/mycode/newpeople.csv"
        )  // write.format()支持输出 json,parquet, jdbc, orc, libsvm, csv, text(仅支持一列)等格式文件
        // Method 2
        dfA.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")
        // Method 3
        dfE.write.parquet("file:///usr/local/spark/mycode/newpeople.parquet")
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

运行结果:
在这里插入图片描述
在这里插入图片描述

注:
parquet文件乱码是正常的
不要将case class定义在main 方法中与import spark.implicits._、toDF一起使用>>>
在这里插入图片描述

二、Mysql 数据库的读取与写入

import org.apache.spark.sql.SparkSession

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object ReadWriteMysql {
    def main(args: Array[String]): Unit ={
        val spark = SparkSession.builder().getOrCreate()  // SparkSession 对象
        // Read
        val df = spark.read.format("jdbc").option(
            "url","jdbc:mysql://master:3306/sparktest"
        ).option("driver","com.mysql.jdbc.Driver").option(
            "user","root"
        ).option("password","Hive@2020").option(
            "dbtable","student"
        ).load()
        df.show()

        // Write
        val rdd = spark.sparkContext.parallelize(
            Array("5 ABC 12", "6 XYZ 102")
        ).map(_.split(" "))
        val schema = StructType(
            List(
                StructField("id", IntegerType, nullable = true),
                StructField("name", StringType, nullable = true),
                StructField("age", IntegerType, nullable = true),
            )
        )
        val rowRdd = rdd.map(s => Row(s(0).toInt,s(1).trim, s(2).toInt))
        val df_ = spark.createDataFrame(rowRdd, schema)
        val p = new Properties()
        p.put("user", "root")
        p.put("password", "Hive@2020")
        p.put("driver", "com.mysql.jdbc.Driver")
        df_.write.mode("append").jdbc(
            "jdbc:mysql://master:3306/sparktest", "sparktest.student", p
        )
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

运行结果:
在这里插入图片描述
在这里插入图片描述

三、Hive 数据仓库的读取与写入

import org.apache.spark.sql.SparkSession

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object ReadWriteHive {
    def main(args: Array[String]): Unit ={
        val spark = SparkSession.builder().appName("Spark Hive Test").enableHiveSupport().getOrCreate()
        // Read
        spark.sql("select * from sparktest.student").show()

        // Write
        val rdd = spark.sparkContext.parallelize(
            Array("5 ABC F 12", "6 XYZ M 102")
        ).map(_.split(" "))
        val schema = StructType(
            List(
                StructField("id", IntegerType, nullable = true),
                StructField("name", StringType, nullable = true),
                StructField("gender", StringType, nullable = true),
                StructField("age", IntegerType, nullable = true),
            )
        )
        val rowRdd = rdd.map(s => Row(s(0).toInt,s(1).trim, s(2).trim, s(3).toInt))
        val df_ = spark.createDataFrame(rowRdd, schema)
        df_.createOrReplaceTempView("temp")
        spark.sql("insert into sparktest.student select * from temp")
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

运行结果:
在这里插入图片描述
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/550119
推荐阅读
相关标签
  

闽ICP备14008679号