赞
踩
parquet是一种列式存储格式的文件类型。存储时可以通过牛X的压缩算法节省存储空间,读取数据时只需要读取所需的列,提高读取性能。
使用 Idea 或 ScalaIDE 创建一个 maven 项目,pom依赖在文章的末尾;在项目中创建一个 scala class,编写如下代码:
PS:不清楚如何创建Spark项目的小伙伴可以看我的这篇文章(如何开发SparkSQL项目?),其实创建Spark的项目和maven的项目没啥太大的区别,只是添加了一个ScalaSDK,在 IDEA 上安装了一个插件而已,很简单。
import org.apache.log4j.{ Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.{ SaveMode, SparkSession} import scala.collection.mutable.ArrayBuffer /** * 用户样本类 * @param user_id 用户id * @param user_name 用户名称 */ case class User(user_id: Int, user_name: String) object ParquetDemo { def main(args: Array[String]): Unit = { // 配置SparkSession的相关信息 val conf = new SparkConf().setAppName("parquetDemo").setMaster("local[*]") // 创建SparkSession val spark = SparkSession.builder().config(conf).getOrCreate() // 获取SparkContext val sc = spark.sparkContext // 注:在实际spark开发中上面的几行代码基本属于标配,固定格式,不会有太大的改动 // 创建一个可变数组,向数组中添加10个用户 val rows = ArrayBuffer[User]() for (i <- 0 until 10) { rows += User(i, s"name${i}") } // 将数组转变成不可变数组 val rowArray = rows.toArray // 使用用户数组来创建RDD val rowRDD = sc.parallelize(rowArray) // 导入spark的隐式转换函数toDF,将RDD转换成DataFrame import spark.implicits._ // 会在当前项目的根目录创建一个spark-warehouse文件夹,里面会有一个user文件夹,里面会生成一些.parquet文件 rowRDD.toDF().write.mode(SaveMode.Overwrite).saveAsTable("user") } }
运行上述代码,我们会在项目根目录下发现生成了如下.parquet文件:
我们通过前一个例子生成了.parquet格式的文件,打开一看发现都是乱码,那么我们该如何读取查看这类.parquet格式的数据呢?
在前一个示例代码的基础上,添加如下代码即可读取.parquet格式的数据:
// 把下面生成.parquet文件这行代码注释掉
// rowRDD.toDF().write.mode(SaveMode.Overwrite).saveAsTable("user")
// 读取.parquet格式的文件
val localDF = spark.read.load("E:\\personal-project\\localhive\\spark-warehouse\\user")
// 展示读取到的数据
localDF.show
再次运行代码,运行结果如下:
成功读取到了我们之前模拟生成的用户数据,而且显示的格式还非常好看。
json 格式的数据大家都清楚长什
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。