赞
踩
每一天都会进行更新,一起冲击未来
StructType---定义数据框的结构
StructType定义DataFrame的结构,是StructField对象的集合或者列表,通过printSchema可以打印出所谓的表字段名,StructType就是所有字段的集合。在创建dataframe的时候,将StructType作为字段的集合,按照顺序一一给各个字段。
StructField--定义DataFrame列的元数据
StructField来定义列名,列类型,可为空的列和元数据。
首先创建样例类
- case class StructType(fields: Array[StructField])
- case class StructField(
- name: String,
- dataType: DataType,
- nullable: Boolean = true,
- metadata: Metadata = Metadata.empty)
创建相关的数据以及字段名
- //创建数据集合
- val simpleData = Seq(
- Row("James ","","Smith","36636","M",3000),
- Row("Michael ","Rose","","40288","M",4000),
- Row("Robert ","","Williams","42114","M",4000),
- Row("Maria ","Anne","Jones","39192","F",4000),
- Row("Jen","Mary","Brown","","F",-1) )
- //创建StructType对象,里面是Array[StructField]类型
- val simpleSchema = StructType(Array(
- StructField("firstname",StringType,true),
- StructField("middlename",StringType,true),
- StructField("lastname",StringType,true),
- StructField("id", StringType, true),
- StructField("gender", StringType, true),
- StructField("salary", IntegerType, true) ))
- //创建dataFrame
- val df = spark.createDataFrame(
- spark.sparkContext.parallelize(simpleData),simpleSchema)
- //打印Schema
- df.printSchema()
代码很简答,需要一个数据集合,创建一个StructType对象,里面包含StructField对象。
前面说过,StructField对象里面包含的是列名以及各种信息。
创建DataFrame。此时,元数据就是simpleData,所谓的Schema就是simpleSchema。
看一下各个字段以及“表结构”
其实上面的案例也比较有一些麻烦,下面来看一下另外一种方法,不用创建样例类
通过StructType.add进行操作,意味着我们不用再去创建StructField对象,通过add方法,只需要写入字段名称和字段方法就可以完成这个操作。
- //创建上下文环境 SparkSql环境
- val sparkSQL = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
- val sparkSession = SparkSession.builder().config(sparkSQL).getOrCreate()
- import sparkSession.implicits._
- //数据集合
- val simpData = Seq(Row("James", "", "Smith", "36636", "M", 3000),
- Row("Michael", "Rose", "", "40288", "M", 4000),
- Row("Robert", "", "Williams", "42114", "M", 4000),
- Row("Maria", "Anne", "Jones", "39121", "F", 4000),
- Row("Jen", "Mary", "Brown", "", "F", -1))
- //创建StructType对象,将字段进行累加
- val structType = new StructType()
- .add("firstname", StringType)
- .add("middlename", StringType)
- .add("lastname", StringType)
- .add("id", StringType)
- .add("gender", StringType)
- .add("salary", StringType)
- //创建DataFrame
- val dataFrame = sparkSession.createDataFrame(
- sparkSession.sparkContext.parallelize(simpData), structType)
- dataFrame.printSchema()
- sparkSession.close()
同样也是需要数据集合以及StructType对象。不过这种操作更加的简便,重要的是不会报错,用最上面的方法创建样例类可能会报错,需要导入不同的包。
- //创建Spark SQL环境
- val sparkSQL = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
- val sparkSession = SparkSession.builder().config(sparkSQL).getOrCreate()
- import sparkSession.implicits._
- //创建数据集,其中最里面的Row对象就是嵌套对象
- val structData = Seq( Row(Row("James ", "", "Smith"), "36636", "M", 3100),
- Row(Row("Michael ", "Rose", ""), "40288", "M", 4300),
- Row(Row("Robert ", "", "Williams"), "42114", "M", 1400),
- Row(Row("Maria ", "Anne", "Jones"), "39192", "F", 5500),
- Row(Row("Jen", "Mary", "Brown"), "", "F", -1))
- //通过StructType的add方法进行添加字段
- val structType = new StructType()
- .add("name",new StructType()
- .add("firstname",StringType)
- .add("middlename",StringType)
- .add("lastname",StringType))
- .add("id",StringType)
- .add("gender",StringType)
- .add("salary",StringType)
- //创建dataframe
- val dataFrame =
- sparkSession.createDataFrame(
- sparkSession.sparkContext.parallelize(structData), structType)
- //打印schema
- dataFrame.printSchema()
- sparkSession.close()
因为name字段进行了嵌套,因此在"name"字段后面的类型里面不再是StringType.而是一个嵌套类型 StructType,这个嵌套类型里面再继续进行add。在这里面嵌套了三个字段。
可以看上面Schema。那么字段的类型是Struct结构。这个Struct结构里面嵌套了三个字段。
其实上面写错了,纠正一下,最后一个字段应该是IntegerType类型
如果写StringType类型,虽然打印Schema没有报错,但是进行select的时候就会报错。所以需要进行修改,在这里说明一下。
dataFrame.select("name").show(false)
看一下嵌套字段的name
本来以为Spark SQL的知识只有一点点,没有想到的是Spark SQL里面的知识很多很多,不单单是SQL语言,虽然可以结合Hive或者Mysql写SQL,但是结构化数据使用本身的DSL+SQL更加的简单。
SQL是重中之重,SQL能解决90%问题,剩下解决不了的问题就交给RDD把
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。