赞
踩
结构化数据(structured data)处理
的 Spark 模块不但兼容 Hive,还可以从 RDD、parquet 文件、JSON 文件中获取数据
,未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据;SparkSession 内部封装了 SparkContext
,所以计算实际上是由 sparkContext 完成的。当我们使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark SparkSession 对 象, 就像我们以前可以自动获取到一个 sc 来表示 SparkContext 对象一样在 Spark 中,DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库中的二维表格
。
进入spark的shell终端
(1) 从 Spark 数据源进行创建
scala> spark.read.
由下图可知,可以读取以下类型的数据
读取 json 文件创建 DataFrame
在 spark 的 bin/input 目录中创建 user.json 文件
{"username":"zhangsan","age":20}
{"username":"lisi","age":30}
{"username":"王五","age":40}
代码如下(示例):
scala> val df = spark.read.json("input/user.json")
scala> df.show
SQL 语法风格
是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助
(1) 通过刚刚创建的DataFrame创建一个临时表
scala> val df = spark.read.json("input/user.json")
(2) 通过 SQL 语句实现查询全表
scala> val sqlDF = spark.sql("select * from user")
scala> val sqlDF = spark.sql("select age from user")
普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表
。使用全局临时表时需要全路径访问,如:global_temp.user
(1) 对于 DataFrame 创建一个全局表
scala> df.createGlobalTempView("user1")
(2) 通过 SQL 语句实现查询全表(不同session)
scala> spark.sql("select username from global_temp.user1").show()
scala> spark.newSession().sql("select username from global_temp.user1").show()
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了
(1) 创建一个 DataFrame
scala> val df = spark.read.json("input/user.json")
(2) 查看 DataFrame 的 Schema 信息
scala> df.printSchema
(3) 只查看"username"列数据
scala> df.select("username").show()
(4) 查看"username"列数据以及"age+1"数据
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
scala> df.select($"username",$"age"+1).show()
scala> df.select('username, 'age+1).show()
(5) 查看"age"大于"30"的数据
scala> df.select('age > 30).show()
(6) 按照"age"分组,查看数据条数
scala> df.groupBy("age").count.show()
DataSet 是具有强类型的数据集合
,需要提供对应的类型信息。
scala> val caseClassDS = List(User("zhangsan",22),User("lisi",31)).toDS()
scala> caseClassDS.show
序列中数据类型保持一致
scala> val listDs = List(1,2,3,4).toDS
scala> val list2Ds = List("3","4").toDS
scala> listDs.show()
注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet
从版本的产生上来看:
如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark 版本中,DataSet 有可能会逐步取代 RDD和 DataFrame 成为唯一的 API 接口。
惰性机制
,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到Action 如 foreach 时,三者才会开始遍历运算
;importspark.implicits._(在 创建好 SparkSession 对象后尽量直接导入)
类型固定为 Row
,每一列的值没法直接访问,只有通过解析才能获取各个字段的值DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
RDD更多的是关注数据本身,DataFrame更多的则是由数据的结构,DataSet是数据的类型
// 准备环境 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sql_trans") val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //spark 不是包名,是上下文环境对象名 import spark.implicits._ // RDD => DataFrame val rdd: RDD[(Int, String, Int)] = spark.sparkContext .makeRDD(List((1, "zhangsan", 30), (2, "lisi", 26), (3, "wangwu", 29))) val df: DataFrame = rdd.toDF("id", "username", "age") df.show() // DataFrame => DataSet // 样例类User val ds: Dataset[User] = df.as[User] ds.show() // RDD => DataSet val ds2: Dataset[User] = rdd.map { case (id, name, age) => { User(id, name, age) } }.toDS() ds2.show() println("--------------------------------------------") // DataSet => RDD val rdd2: RDD[User] = ds2.rdd // DataSet => DataFrame val df2: DataFrame = ds2.toDF() df2.show() //DataFrame => RDD val rdd3: RDD[Row] = df2.rdd /** * RDD 返回的 RDD 类型为 Row,里面提供的 getXXX 方法可以获取字段值 * 类似 jdbc 处理结果集,但是索引从 0 开始 */ // 对应id : 2 3 1 rdd3.foreach(filed => print(filed.getInt(0) + "\t")) // 对应name : lisi wangwu zhangsan rdd3.foreach(filed => print(filed.getString(1) + "\t")) // 对应age : 26 29 30 rdd3.foreach(filed => print(filed.getInt(2) + "\t")) // 关闭 spark.close() } // 样例类 case class User(id: Int, username: String, age: Int)
结果:
+---+--------+---+ | id|username|age| +---+--------+---+ | 1|zhangsan| 30| | 2| lisi| 26| | 3| wangwu| 29| +---+--------+---+ +---+--------+---+ | id|username|age| +---+--------+---+ | 1|zhangsan| 30| | 2| lisi| 26| | 3| wangwu| 29| +---+--------+---+ +---+--------+---+ | id|username|age| +---+--------+---+ | 1|zhangsan| 30| | 2| lisi| 26| | 3| wangwu| 29| +---+--------+---+ -------------------------------------------- +---+--------+---+ | id|username|age| +---+--------+---+ | 1|zhangsan| 30| | 2| lisi| 26| | 3| wangwu| 29| +---+--------+---+ 2 3 1 lisi wangwu zhangsan 26 29 30 Process finished with exit code 0
文章也是仅作知识点的记录,欢迎大家指出错误,一起探讨~~~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。