当前位置:   article > 正文

Spark:DataFrame使用_scala spark dataframe contains 函数

scala spark dataframe contains 函数

Spark SQL

Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是 DataFrame

DataFrame=RDD+Schema

它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。 DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据 库,以及RDD

Spark1.3出现的 DataFrame ,Spark1.6出现了 DataSet ,在Spark2.0中两者统一,DataFrame等于 DataSet[Row]

SparkSession

要使用Spark SQL,首先需要创建一个SpakSession对象
SparkSession中包含了SparkContextSqlContext
所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext
这个SqlContext是使用sparkSQL操作hive的时候会用到的

创建DataFrame

使用SparkSession,可以从RDD、HIve表或者其它数据源创建DataFrame
那下面我们来使用JSON文件来创建一个DataFrame

想要使用spark-sql需要先添加spark-sql的依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.3</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

使用案例
Scala版本

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SqlDemoScala {

  	def main(args: Array[String]): Unit = {
	    val conf = new SparkConf()
	        .setMaster("local")
	
	    //创建SparkSession对象,里面包含SparkContext和SqlContext
	    val sparkSession = SparkSession.builder()
	      .appName("SqlDemoScala")
	      .config(conf)
	      .getOrCreate()
	
	    //读取json文件,获取DataFrame
	    val stuDf = sparkSession.read.json("~/student.json")
	
	    //查看DataFrame中的数据
	    stuDf.show()
	
	    sparkSession.stop()
  	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

Java版本

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class SqlDemoJava {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");

        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("SqlDemoJava")
                .config(conf)
                .getOrCreate();
        //读取json文件,获取Dataset<Row>
        Dataset<Row> stuDf = sparkSession.read().json("~/student.json");

        stuDf.show();

        sparkSession.stop();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

由于DataFrame等于DataSet[Row],它们两个可以互相转换,所以创建哪个都是一样的
前面的scala代码默认创建的是DataFrame,java代码默认创建的是DataSet
尝试对他们进行转换

在Scala代码中将DataFrame转换为DataSet[Row],对后面的操作没有影响

// 将DataFrame转换为DataSet[Row]
val stuDf = sparkSession.read.json("~/student.json").as("stu")
  • 1
  • 2

在Java代码中将DataSet[Row]转换为DataFrame

// 将Dataset<Row>转换为DataFrame
Dataset<Row> stuDf = sparkSession.read().json("~/student.json").toDF();
  • 1
  • 2

DataFrame常用算子操作

DataFrame有以下常用算子:

  • printSchema() 打印表结构信息
  • show() 展示表数据
  • select() 类似于sql中的sql,获取字段信息
  • filter()、where() 查询/过滤条件
  • groupBy() 分组
  • count() 求和

Scala版本

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DataFrameOpScala {
  	def main(args: Array[String]): Unit = {
	    val conf = new SparkConf()
	      .setMaster("local")
	
	    //创建SparkSession对象,里面包含SparkContext和SqlContext
	    val sparkSession = SparkSession.builder()
	      .appName("DataFrameOpScala")
	      .config(conf)
	      .getOrCreate()
	
	    val stuDf = sparkSession.read.json("~/student.json")
	
	    //打印schema信息
	    stuDf.printSchema()
	
	    //默认显示所有数据,可以通过参数控制显示多少条
	    stuDf.show(2)
	
	    //查询数据中的指定字段信息
	    stuDf.select("name","age").show()
	
	    //在使用select的时候可以对数据做一些操作,需要添加隐式转换函数,否则语法报错
	    import sparkSession.implicits._
	    stuDf.select($"name",$"age"+1).show()
	
	    //对数据进行过滤,需要添加隐式转换函数,否则报错
	    stuDf.filter($"age">18).show()
	    //where底层调用的就是filter
	    stuDf.where($"age">18).show()
	
	    //对数据进行分组求和
	    stuDf.groupBy("age").count().show()
	
	    sparkSession.stop()
  	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

Java版本

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;


public class DataFrameOpJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");

        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("DataFrameOpJava")
                .config(conf)
                .getOrCreate();
        //读取json文件,获取Dataset<Row>
        Dataset<Row> stuDf = sparkSession.read().json("~/student.json");

        //打印schema信息
        stuDf.printSchema();

        //默认显示所有数据,可以通过参数控制显示多少条
        stuDf.show(2);

        //查询数据中的指定字段信息
        stuDf.select("name","age").show();

        //在使用select的时候可以对数据做一些操作,需要引入 import static org.apache.spark.sql.functions.col;
        stuDf.select(col("name"),col("age").plus(1)).show();

        //对数据进行过滤
        stuDf.filter(col("age").gt(18)).show();
        stuDf.where(col("age").gt(18)).show();

        //对数据进行分组求和
        stuDf.groupBy("age").count().show();

        sparkSession.stop();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

这些就是针对DataFrame的一些常见的操作。
但是现在这种方式其实用起来还是不方便,只是提供了一些类似于可以操作表的算子,很对一些简单的查询还是可以的,但是针对一些复杂的操作,使用算子写起来就很麻烦了,所以我们希望能够直接支持用sql的方式执行,Spark SQL也是支持的

DataFrame的sql操作

想要实现直接支持sql语句查询DataFrame中的数据
需要两步操作

  1. 先将DataFrame注册为一个临时表
  2. 使用sparkSession中的sql函数执行sql语句

下面来看一个案例
Scala版本

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession


object DataFrameSqlScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")

    //创建SparkSession对象,里面包含SparkContext和SqlContext
    val sparkSession = SparkSession.builder()
      .appName("DataFrameSqlScala")
      .config(conf)
      .getOrCreate()

    val stuDf = sparkSession.read.json("~/student.json")

    //将DataFrame注册为一个临时表
    stuDf.createOrReplaceTempView("student")

    //使用sql查询临时表中的数据
    sparkSession.sql("select age,count(*) as num from student group by age")
      .show()

    sparkSession.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

java版本

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class DataFrameSqlJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");

        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("DataFrameSqlJava")
                .config(conf)
                .getOrCreate();
        Dataset<Row> stuDf = sparkSession.read().json("D:\\student.json");

        //将Dataset<Row>注册为一个临时表
        stuDf.createOrReplaceTempView("student");

        //使用sql查询临时表中的数据
        sparkSession.sql("select age,count(*) as num from student group by age")
                .show();

        sparkSession.stop();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

RDD转换为DataFrame

为什么要将RDD转换为DataFrame?
在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供的transformation算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。
所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。

Spark SQL支持这两种方式将RDD转换为DataFrame

  1. 反射方式
  2. 编程方式
反射方式

这种方式是使用反射来推断RDD中的元数据。
基于反射的方式,代码比较简洁,也就是说当你在写代码的时候,已经知道了RDD中的元数据,这样的话使用反射这种方式是一种非常不错的选择。

Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为 DataFrame的

Scala版本

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object RddToDataFrameByReflectScala {
  	def main(args: Array[String]): Unit = {
	    val conf = new SparkConf()
	      .setMaster("local")
	
	    //创建SparkSession对象,里面包含SparkContext和SqlContext
	    val sparkSession = SparkSession.builder()
	      .appName("RddToDataFrameByReflectScala")
	      .config(conf)
	      .getOrCreate()
	
	    //获取SparkContext
	    val sc = sparkSession.sparkContext
	
	    val dataRDD = sc.parallelize(Array(("jack",18),("tom",20),("jessic",30)))
	
	    //基于反射直接将包含Student对象的dataRDD转换为dataFrame
	
	    //需要导入隐式转换
	    import sparkSession.implicits._
	    val stuDf = dataRDD.map(tup=>Student(tup._1,tup._2)).toDF()
	
	    //下面就可以通过DataFrame的方式操作dataRDD中的数据了
	    stuDf.createOrReplaceTempView("student")
	
	    //执行sql查询
	    val resDf = sparkSession.sql("select name,age from student where age > 18")
	
	    //将DataFrame转化为RDD
	    val resRDD = resDf.rdd
	    //从row中取数据,封装成student,打印到控制台
	    resRDD.map(row=>Student(row(0).toString,row(1).toString.toInt))
	      .collect()
	      .foreach(println(_))
	
	    //使用row的getAs方法,获取指定列名的值
	    resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs[Int]("age")))
	      .collect()
	      .foreach(println(_))
	
	    sparkSession.stop()
  	}
}

//定义一个Student
case class Student(name: String,age: Int)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

Java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;


public class RddToDataFrameReflectJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");

        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("RddToDataFrameReflectJava")
                .config(conf)
                .getOrCreate();


        //获取SparkContext
        //从sparkSession中获取的是scala中的SparkContext,所以需要转换成java中的SparkContext
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        Tuple2<String, Integer> t1 = new Tuple2<>("jack", 18);
        Tuple2<String, Integer> t2 = new Tuple2<>("tom", 20);
        Tuple2<String, Integer> t3 = new Tuple2<>("jessic", 30);
        JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3));

        JavaRDD<Student> stuRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Student>() {
            @Override
            public Student call(Tuple2<String, Integer> tup) throws Exception {
                return new Student(tup._1, tup._2);
            }
        });

        //注意:Student这个类必须声明为public,并且必须实现序列化
        Dataset<Row> stuDf = sparkSession.createDataFrame(stuRDD, Student.class);

        stuDf.createOrReplaceTempView("student");

        //执行sql查询
        Dataset<Row> resDf = sparkSession.sql("select name,age from student where age > 18");

        //将DataFrame转化为RDD,注意,这里需要转化为JavaRDD
        JavaRDD<Row> resRDD = resDf.javaRDD();
        //从row中取数据,封装成student,打印到控制台
        List<Student> resList = resRDD.map(new Function<Row, Student>() {
            @Override
            public Student call(Row row) throws Exception {
                //return new Student(row.getString(0),row.getInt(1));
                //通过getAs获取数据
                return new Student(row.getAs("name").toString(), Integer.parseInt(row.getAs("age").toString()));
            }
        }).collect();

        for(Student stu: resList){
            System.out.println(stu);
        }

        sparkSession.stop();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
编程方式

这种方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。
也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了 下面看一个案例:

Scala版本

import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}


object RddToDataFrameByProgramScala {
  	def main(args: Array[String]): Unit = {
	    val conf = new SparkConf()
	      .setMaster("local")
	
	    //创建SparkSession对象,里面包含SparkContext和SqlContext
	    val sparkSession = SparkSession.builder()
	      .appName("RddToDataFrameByReflectScala")
	      .config(conf)
	      .getOrCreate()
	
	    //获取SparkContext
	    val sc = sparkSession.sparkContext
	
	    val dataRDD = sc.parallelize(Array(("jack",18),("tom",20),("jessic",30)))
	
	    //组装rowRDD
	    val rowRDD = dataRDD.map(tup=>Row(tup._1,tup._2))
	    //指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】
	    val schema = StructType(Array(
	      StructField("name",StringType,true),
	      StructField("age",IntegerType,true)
	    ))
	
	    //组装DataFrame
	    val stuDf = sparkSession.createDataFrame(rowRDD,schema)
	
	    //下面就可以通过DataFrame的方式操作dataRDD中的数据了
	    stuDf.createOrReplaceTempView("student")
	
	    //执行sql查询
	    val resDf = sparkSession.sql("select name,age from student where age > 18")
	
	    //将DataFrame转化为RDD
	    val resRDD = resDf.rdd
	    resRDD.map(row=>(row(0).toString,row(1).toString.toInt))
	      .collect()
	      .foreach(println(_))
	
	    sparkSession.stop()
  	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

Java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.In;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


public class RddToDataFrameByProgramJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");

        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("RddToDataFrameByProgramJava")
                .config(conf)
                .getOrCreate();


        //获取SparkContext
        //从sparkSession中获取的是scala中的SparkContext,所以需要转换成java中的SparkContext
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        Tuple2<String, Integer> t1 = new Tuple2<>("jack", 18);
        Tuple2<String, Integer> t2 = new Tuple2<>("tom", 20);
        Tuple2<String, Integer> t3 = new Tuple2<>("jessic", 30);
        JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3));

        //组装RDD
        JavaRDD<Row> rowRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Row>() {
            @Override
            public Row call(Tuple2<String, Integer> tup) throws Exception {
                return RowFactory.create(tup._1, tup._2);
            }
        });

        //指定元数据信息
        ArrayList<StructField> structFieldList = new ArrayList<>();
        structFieldList.add(DataTypes.createStructField("name",DataTypes.StringType,true));
        structFieldList.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));

        StructType schema = DataTypes.createStructType(structFieldList);

        //构建DataFrame
        Dataset<Row> stuDf = sparkSession.createDataFrame(rowRDD, schema);

        stuDf.createOrReplaceTempView("student");
        Dataset<Row> resDf = sparkSession.sql("select name,age from student where age > 18");

        JavaRDD<Row> resRDD = resDf.javaRDD();

        List<Tuple2<String, Integer>> resList = resRDD.map(new Function<Row, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(row.getString(0), row.getInt(1));
            }
        }).collect();

        for(Tuple2<String,Integer> tup: resList){
            System.out.println(tup);
        }
        sparkSession.stop();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

load和save操作

对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的loadsave操作。

load操作主要用于加载数据,创建出DataFrame;
save操作,主要用于将DataFrame中的数据保存到文件中。

我们前面操作json格式的数据的时候好像没有使用load方法,而是直接使用的json方法,这是什么特殊用法吗?
查看json方法的源码会发现,它底层调用的是format和load方法

def json(paths: String*): DataFrame = format("json").load(paths : _*)
  • 1

我们如果使用原始的format和load方法加载数据,此时如果不指定format,则默认读取的数据源格式是parquet,也可以手动指定数据源格式。
Spark SQL内置了一些常见的数据源类型,比如json, parquet, jdbc, orc, csv, text
通过这个功能,就可以在不同类型的数据源之间进行转换了。

下面来看使用案例:
Scala版本

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object LoadAndSaveOpScala {

  	def main(args: Array[String]): Unit = {
	    val conf = new SparkConf()
	      .setMaster("local")
	
	    //创建SparkSession对象,里面包含SparkContext和SqlContext
	    val sparkSession = SparkSession.builder()
	      .appName("LoadAndSaveOpScala")
	      .config(conf)
	      .getOrCreate()
	
	    //读取数据
	    val stuDf = sparkSession.read.format("json").load("~/student.json")
	
	    //保存数据
	    stuDf.select("name","age")
	      .write
	      .format("csv")
	      .mode(SaveMode.Append)//追加
	      .save("hdfs://bigdata01:9000/out-save001")
	
	    sparkSession.stop()
  	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class LoadAndSaveOpJava {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");

        //创建SparkSession对象,里面包含SparkContext和SqlContext
        SparkSession sparkSession = SparkSession.builder()
                .appName("LoadAndSaveOpJava")
                .config(conf)
                .getOrCreate();

        //读取数据
        Dataset<Row> stuDf = sparkSession.read().format("json").load("~/student.json");

        //保存数据
        stuDf.select("name","age")
                .write()
                .format("csv")
                .save("hdfs://bigdata01:9000/out-save002");

        sparkSession.stop();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

执行代码,查看结果,csv文件是使用逗号分隔的:

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /out-save001
Found 2 items
-rw-r--r--   3 yehua supergroup          0 2020-05-29 17:53 /out-save001/_SUC
-rw-r--r--   3 yehua supergroup         46 2020-05-29 17:53 /out-save001/part
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out-save001/part-00000-9bf82de6
jack,19
tom,18
jessic,27
hehe,18
haha,15
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
SaveMode

Spark SQL对于save操作,提供了不同的save mode。
主要用来处理,当目标位置已经有数据时应该如何处理。save操作不会执行锁操作,并且也不是原子 的,因此是有一定风险出现脏数据的。

SaveMode解释
SaveMode.ErrorIfExists (默认)如果目标位置已经存在数据,那么抛出一个异常
SaveMode.Append如果目标位置已经存在数据,那么将数据追加进去
SaveMode.Overwrite如果目标位置已经存在数据,那么就将已经存在的数据删除,然后再写入
SaveMode.Ignore如果目标位置已经存在数据,那么就忽略,不做任何操作

在LoadAndSaveOpScala中增加SaveMode的设置,重新执行,验证结果
将SaveMode设置为Append,如果目标已存在,则追加

stuDf.select("name","age")
  	 .write
	 .format("csv") 
	 .mode(SaveMode.Append) //追加 
	 .save("hdfs://bigdata01:9000/out-save001")
  • 1
  • 2
  • 3
  • 4
  • 5

执行之后的结果确实是追加到之前的结果目录中了

内置函数

Spark中提供了很多内置的函数

种类函数
聚合函数avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
集合函数array_contains, explode, size
日期/时间函数datediff, date_add, date_sub, add_months, last_day, next_day, months_between, current_date, current_timestamp, date_format
数学函数abs, ceil, floor, round
混合函数if, isnull, md5, not, rand, when
字符串函数concat, get_json_object, length, reverse, split, upper
窗口函数denseRank, rank, rowNumber

这里面的函数和hive中的函数是类似的

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号