当前位置:   article > 正文

Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)_spark的 reslistbeandataset.transform

spark的 reslistbeandataset.transform

        一:准备数据源

      在项目下新建一个student.txt文件,里面的内容为:

       

  1. 1,zhangsan,20
  2. 2,lisi,21
  3. 3,wanger,19
  4. 4,fangliu,18
      二:实现

     Java版:

    1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:

    

  1. package com.cxd.sql;
  2. import java.io.Serializable;
  3. @SuppressWarnings("serial")
  4. public class Student implements Serializable {
  5.     String sid;
  6.     String sname;
  7.     int sage;
  8.     public String getSid() {
  9.         return sid;
  10.     }
  11.     public void setSid(String sid) {
  12.         this.sid = sid;
  13.     }
  14.     public String getSname() {
  15.         return sname;
  16.     }
  17.     public void setSname(String sname) {
  18.         this.sname = sname;
  19.     }
  20.     public int getSage() {
  21.         return sage;
  22.     }
  23.     public void setSage(int sage) {
  24.         this.sage = sage;
  25.     }
  26.     @Override
  27.     public String toString() {
  28.         return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
  29.     }
  30.     
  31. }
         2.转换,具体代码如下

  1. package com.cxd.sql;
  2. import java.util.ArrayList;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.sql.Dataset;
  6. import org.apache.spark.sql.Row;
  7. import org.apache.spark.sql.RowFactory;
  8. import org.apache.spark.sql.SaveMode;
  9. import org.apache.spark.sql.SparkSession;
  10. import org.apache.spark.sql.types.DataTypes;
  11. import org.apache.spark.sql.types.StructField;
  12. import org.apache.spark.sql.types.StructType;
  13. public class TxtToParquetDemo {
  14.     public static void main(String[] args) {
  15.         
  16.         SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
  17.         SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
  18.         reflectTransform(spark);//Java反射
  19.         dynamicTransform(spark);//动态转换
  20.     }
  21.     
  22.     /**
  23.      * 通过Java反射转换
  24.      * @param spark
  25.      */
  26.     private static void reflectTransform(SparkSession spark)
  27.     {
  28.         JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
  29.         
  30.         JavaRDD<Student> rowRDD = source.map(line -> {
  31.             String parts[] = line.split(",");
  32.             Student stu = new Student();
  33.             stu.setSid(parts[0]);
  34.             stu.setSname(parts[1]);
  35.             stu.setSage(Integer.valueOf(parts[2]));
  36.             return stu;
  37.         });
  38.         
  39.         Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
  40.         df.select("sid", "sname", "sage").
  41.         coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
  42.     }
  43.     /**
  44.      * 动态转换
  45.      * @param spark
  46.      */
  47.     private static void dynamicTransform(SparkSession spark)
  48.     {
  49.         JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
  50.         
  51.         JavaRDD<Row> rowRDD = source.map( line -> {
  52.             String[] parts = line.split(",");
  53.             String sid = parts[0];
  54.             String sname = parts[1];
  55.             int sage = Integer.parseInt(parts[2]);
  56.             
  57.             return RowFactory.create(
  58.                     sid,
  59.                     sname,
  60.                     sage
  61.                     );
  62.         });
  63.         
  64.         ArrayList<StructField> fields = new ArrayList<StructField>();
  65.         StructField field = null;
  66.         field = DataTypes.createStructField("sid", DataTypes.StringType, true);
  67.         fields.add(field);
  68.         field = DataTypes.createStructField("sname", DataTypes.StringType, true);
  69.         fields.add(field);
  70.         field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
  71.         fields.add(field);
  72.         
  73.         StructType schema = DataTypes.createStructType(fields);
  74.         
  75.         Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
  76.         df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
  77.         
  78.         
  79.     }
  80.     
  81. }

     scala版本:

    

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType

object RDD2Dataset {
  
  case class Student(id:Int,name:String,age:Int)
  def main(args:Array[String])
  {
    
    val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
    import spark.implicits._
    reflectCreate(spark)
    dynamicCreate(spark)
  }
  
 /**
	 * 通过Java反射转换
	 * @param spark
	 */
  private def reflectCreate(spark:SparkSession):Unit={
    import spark.implicits._
    val stuRDD=spark.sparkContext.textFile("student2.txt")
    //toDF()为隐式转换
    val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
    //stuDf.select("id","name","age").write.text("result") //对写入文件指定列名
    stuDf.printSchema()
    stuDf.createOrReplaceTempView("student")
    val nameDf=spark.sql("select name from student where age<20")
    //nameDf.write.text("result") //将查询结果写入一个文件
    nameDf.show()
  }
  
  /**
	 * 动态转换
	 * @param spark
	 */
  private def dynamicCreate(spark:SparkSession):Unit={
    val stuRDD=spark.sparkContext.textFile("student.txt")
    import spark.implicits._
    val schemaString="id,name,age"
    val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema=StructType(fields)
    val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
    val stuDf=spark.createDataFrame(rowRDD, schema)
        stuDf.printSchema()
    val tmpView=stuDf.createOrReplaceTempView("student")
    val nameDf=spark.sql("select name from student where age<20")
    //nameDf.write.text("result") //将查询结果写入一个文件
    nameDf.show()
  }
}
     注:1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。

             2.此代码不适用于spark2.0以前的版本。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/589171
推荐阅读
相关标签
  

闽ICP备14008679号