当前位置:   article > 正文

RDD转换成DataFrame的两种方式(分别用Java和scala实现)_javardd转dataframe

javardd转dataframe

一:准备数据源

  在项目下新建一个student.txt文件,里面的内容为:
  • 1
1,zhangsan,20

2,lisi,21

3,wanger,19

4,fangliu,18

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

二:实现

Java版:

1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:
  • 1
import java.io.Serializable;  

public class Student implements Serializable {  

    String sid;  
    String sname;  
    int sage;  
    public String getSid() {  
        return sid;  
    }  
    public void setSid(String sid) {  
        this.sid = sid;  
    }  
    public String getSname() {  
        return sname;  
    }  
    public void setSname(String sname) {  
        this.sname = sname;  
    }  
    public int getSage() {  
        return sage;  
    }  
    public void setSage(int sage) {  
        this.sage = sage;  
    }  
    @Override  
    public String toString() {  
        return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";  
    }  

}  

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

2.转换,具体代码如下

package com.cxd.sql;  

import java.util.ArrayList;  

import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.sql.Dataset;  
import org.apache.spark.sql.Row;  
import org.apache.spark.sql.RowFactory;  
import org.apache.spark.sql.SaveMode;  
import org.apache.spark.sql.SparkSession;  
import org.apache.spark.sql.types.DataTypes;  
import org.apache.spark.sql.types.StructField;  
import org.apache.spark.sql.types.StructType;  

public class TxtToParquetDemo {  

    public static void main(String[] args) {  

        SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");  
        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();  

        reflectTransform(spark);//Java反射  
        dynamicTransform(spark);//动态转换  
    }  

    /** 
     * 通过Java反射转换 
     * @param spark 
     */  
    private static void reflectTransform(SparkSession spark)  
    {  
        JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();  

        JavaRDD<Student> rowRDD = source.map(line -> {  
            String parts[] = line.split(",");  

            Student stu = new Student();  
            stu.setSid(parts[0]);  
            stu.setSname(parts[1]);  
            stu.setSage(Integer.valueOf(parts[2]));  
            return stu;  
        });  

        Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);  
        df.select("sid", "sname", "sage").  
        coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");  
    }  
    /** 
     * 动态转换 
     * @param spark 
     */  
    private static void dynamicTransform(SparkSession spark)  
    {  
        JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();  

        JavaRDD<Row> rowRDD = source.map( line -> {  
            String[] parts = line.split(",");  
            String sid = parts[0];  
            String sname = parts[1];  
            int sage = Integer.parseInt(parts[2]);  

            return RowFactory.create(  
                    sid,  
                    sname,  
                    sage  
                    );  
        });  

        ArrayList<StructField> fields = new ArrayList<StructField>();  
        StructField field = null;  
        field = DataTypes.createStructField("sid", DataTypes.StringType, true);  
        fields.add(field);  
        field = DataTypes.createStructField("sname", DataTypes.StringType, true);  
        fields.add(field);  
        field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);  
        fields.add(field);  

        StructType schema = DataTypes.createStructType(fields);  

        Dataset<Row> df = spark.createDataFrame(rowRDD, schema);  
        df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");  


    }  

}  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

scala版本:

import org.apache.spark.sql.SparkSession  
import org.apache.spark.sql.types.StringType  
import org.apache.spark.sql.types.StructField  
import org.apache.spark.sql.types.StructType  
import org.apache.spark.sql.Row  
import org.apache.spark.sql.types.IntegerType  

object RDD2Dataset {  

  case class Student(id:Int,name:String,age:Int)  
  def main(args:Array[String])  
  {  

    val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()  
    import spark.implicits._  
    reflectCreate(spark)  
    dynamicCreate(spark)  
  }  

 /**  
     * 通过Java反射转换  
     * @param spark  
     */  
  private def reflectCreate(spark:SparkSession):Unit={  
    import spark.implicits._  
    val stuRDD=spark.sparkContext.textFile("student2.txt")  
    //toDF()为隐式转换  
    val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()  
    //stuDf.select("id","name","age").write.text("result") //对写入文件指定列名  
    stuDf.printSchema()  
    stuDf.createOrReplaceTempView("student")  
    val nameDf=spark.sql("select name from student where age<20")  
    //nameDf.write.text("result") //将查询结果写入一个文件  
    nameDf.show()  
  }  

  /**  
     * 动态转换  
     * @param spark  
     */  
  private def dynamicCreate(spark:SparkSession):Unit={  
    val stuRDD=spark.sparkContext.textFile("student.txt")  
    import spark.implicits._  
    val schemaString="id,name,age"  
    val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))  
    val schema=StructType(fields)  
    val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))  
    val stuDf=spark.createDataFrame(rowRDD, schema)  
        stuDf.printSchema()  
    val tmpView=stuDf.createOrReplaceTempView("student")  
    val nameDf=spark.sql("select name from student where age<20")  
    //nameDf.write.text("result") //将查询结果写入一个文件  
    nameDf.show()  
  }  
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

注:1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。
2.此代码不适用于spark2.0以前的版本。
文章转载自https://blog.csdn.net/u010592112/article/details/73730796
考试不用担心了​

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号