赞
踩
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * @description: * 通过RDD创建dataframe的方式1: 把rdd[T]变成 RDD[case class类型]就可以直接toDF */ // 1,张飞,21,北京,80.0 case class Stu(id:Int,name:String,age:Int,city:String,score:Double) object Demo2_CreateDF_1 { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() import spark.implicits._ // 加载不含schema信息的数据为一个RDD val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt") // 将rdd[String]转成rdd[Stu] val rddStu: RDD[Stu] = rdd .map(line=>line.split(",")) .map(arr=>Stu(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble)) // 然后就可以将这种装着case class类型数据的RDD直接转成dataframe了 val df = spark.createDataFrame(rddStu) df.printSchema() df.show() // 另一种更简洁的方式:需要导入隐式转换: import spark.implicits._ // 也是更常用的方式 val df2: DataFrame = rddStu.toDF() spark.close() } }
import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession /** * @description: * 通过RDD[tuple]创建dataframe */ object Demo3_CreateDF_2 { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession .builder() .appName("demo3") .master("local[*]") .getOrCreate() import spark.implicits._ val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt") // 将RDD[String] 变成RDD[(f1,f2,f3,....)] val addTuple: RDD[(Int, String, Int, String, Double)] = rdd.map(_.split(",")).map(arr=>(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble)) // 将RDD[(f1,f2,f3,....)] 转换成dataframe即可,框架能从tuple中自动推断出结构 // val df = addTuple.toDF() /** * root * |-- _1: integer (nullable = false) * |-- _2: string (nullable = true) * |-- _3: integer (nullable = false) * |-- _4: string (nullable = true) * |-- _5: double (nullable = false) */ // 在转换时,可以对dataframe中的字段,传入一套自定义的字段名 val df = addTuple.toDF("id","name","age","city","score") // 打印schema df.printSchema() /** * root * |-- id: integer (nullable = false) * |-- name: string (nullable = true) * |-- age: integer (nullable = false) * |-- city: string (nullable = true) * |-- score: double (nullable = false) */ // 打印数据 df.show() spark.close() } }
/** * @description: 一个用于封装stu数据的javabean */ // 1,张飞,21,北京,80.0 public class JavaStu { int id; String name; int age; String city; double score; public JavaStu() { } public JavaStu(int id, String name, int age, String city, double score) { this.id = id; this.name = name; this.age = age; this.city = city; this.score = score; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public double getScore() { return score; } public void setScore(double score) { this.score = score; } @Override public String toString() { return "JavaStu{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", city='" + city + '\'' + ", score=" + score + '}'; } }
import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession /** * @description: * 通过RDD[JavaBean]创建dataframe * get * set */ object Demo4_CreateDF_3 { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession .builder() .appName("demo3") .master("local[*]") .getOrCreate() import spark.implicits._ val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt") // RDD[String] 变成 RDD[JavaStu] val rddJavaStu: RDD[JavaStu] = rdd.map(line=>{ val arr = line.split(",") new JavaStu(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble) }) // 将 RDD[JavaStu] 变成 dataframe val df = spark.createDataFrame(rddJavaStu,classOf[JavaStu]) df.printSchema() df.show() spark.close() } }
import scala.beans.BeanProperty /** * @description: scala版本的bean类 */ class ScalaStu( @BeanProperty val id:Int, @BeanProperty val name:String, @BeanProperty val age:Int, @BeanProperty val city:String, @BeanProperty val score:Double ) object ScalaStu{ def apply( id: Int, name: String, age: Int, city: String, score: Double ): ScalaStu = new ScalaStu(id, name, age, city, score) }
import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 通过RDD[scala bean] 创建dataframe * */ object Demo5_CreateDF_4 { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession .builder() .appName("demo3") .master("local[*]") .getOrCreate() import spark.implicits._ val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt") // RDD[String] 变成 RDD[ScalaStu] val rdd2: RDD[ScalaStu] = rdd.map(line=>{ val arr = line.split(",") val bean = ScalaStu(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble) bean }) // 用RDD[ScalaStu]创建dataframe val df: DataFrame = spark.createDataFrame(rdd2,classOf[ScalaStu]) df.printSchema() df.show() spark.close() } }
import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} /** * @description: * 通过 RDD[Row] 来创建dataframe * Row是sparksql中对数据的统一封装形式(内置定义好的一个数据封装类) * 任何数据要变成dataframe,都得先把数据变成Row */ object Demo6_CreateDF_5 { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession .builder() .appName("demo3") .master("local[*]") .getOrCreate() import spark.implicits._ val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt") // RDD[String] 变成 RDD[Row] val rdd2: RDD[Row] = rdd.map(line=>{ val arr = line.split(",") Row(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble) }) // 构造一个数据描述(表定义) val schema = new StructType(Array( new StructField("id",DataTypes.IntegerType), new StructField("name",DataTypes.StringType), new StructField("age",DataTypes.IntegerType), new StructField("city",DataTypes.StringType), new StructField("score",DataTypes.DoubleType) )) // 用RDD[Row] + 数据描述 ==》 构造一个dataframe val df = spark.createDataFrame(rdd2,schema) df.printSchema() df.show() // df.where("socre>85").select("id","name") spark.close() } }
import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object Demo7_CreateDF_SetSeqMap { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("").master("local[*]").getOrCreate() val seq1 = Seq(1,2,3,4) val seq2 = Seq(11,22,33,44) val rdd: RDD[Seq[Int]] = spark.sparkContext.parallelize(List(seq1,seq2)) import spark.implicits._ val df = rdd.toDF() df.printSchema() df.show() df.selectExpr("value[0]","size(value)").show() /** * set类型数据rdd的编解码 */ val set1 = Set("a","b") val set2 = Set("c","d","e") val rdd2: RDD[Set[String]] = spark.sparkContext.parallelize(List(set1,set2)) val df2 = rdd2.toDF("members") df2.printSchema() df2.show() /** * map类型数据rdd的编解码 */ val map1 = Map("father"->"mayun","mother"->"tangyan") val map2 = Map("father"->"huateng","mother"->"yifei","brother"->"sicong") val rdd3: RDD[Map[String, String]] = spark.sparkContext.parallelize(List(map1,map2)) val df3 = rdd3.toDF("jiaren") df3.printSchema() df3.show() df3.selectExpr("jiaren['mother']","size(jiaren)","map_keys(jiaren)","map_values(jiaren)") .show(10,false) spark.close() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。