当前位置:   article > 正文

Spark学习,RDD算子,RDD的读写等一篇总结

Spark学习,RDD算子,RDD的读写等一篇总结

1.RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集

RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算,主要属性包括:

  • 分区列表
  • 计算函数
  • 依赖关系
  • 分区函数(默认是 hash)
  • 最佳位置

1.1 RDD算子

RDD算子用来处理RDD数据集,跟flink的算子很多都相似。

  • Transformation转换操作:返回一个新的 RDD
  • Action动作操作:返回值不是 RDD(无返回值或返回其他的)

其实算子就是用来处理我们SQL解决不了或者解决很慢的一种方法

1.1.1Transformation

​ 转换算子

map(func):返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成

比如你有一个list类型的RDD,你可以使用map(func)将String类型的转换成Iteger类型的RDD,他传入的是一个函数,所以需要提供对应的函数。

filter(func):返回一个新的 RDD,用于过滤限定条件的算子,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成,也就是说新的rdd是大于10的数据集。

public class SparkFilterExample {
    public static void main(String[] args) {
        // 创建一个包含一些数字的 RDD
        JavaRDD<Integer> numbers = sc.parallelize(new Integer[]{1, 15, 20, 4, 12, 7});
        // 使用 filter 算子筛选出大于 10 的数字
        JavaRDD<Integer> filteredNumbers = numbers.filter(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer number) throws Exception {
                return number > 10;
            }
        });
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

flatMap(func): 类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素)

union(otherDataset):对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

groupByKey([numTasks]): 在一个(K,V)的 RDD 上调用,返回一个(K, Iterator[V])的 RDD

// 使用 flatMapToPair,算用户浏览过的不同商品的种类数。,先用flatMapToPair返回(k,v)的集合
        JavaPairRDD<String, String> userProducts = clickStreams.flatMapToPair(line -> {
            String[] parts = line.split("\\|");
            if ("product_page".equals(parts[2])) {  // 只处理产品页面的点击
                return Arrays.asList(new Tuple2<>(parts[0], parts[3]));
            } else {
                return Arrays.asList();  // 如果不是产品页面,则返回空列表
            }
        });
    // 使用 groupByKey 对用户 ID 进行分组
JavaPairRDD<String, Iterable<String>> groupedUserProducts = userProducts.groupByKey();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

reduceByKey(func, [numTasks]): 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置

sortByKey([ascending], [numTasks]):在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD

1.1.2Action

动作算子

saveAsTextFile(path):将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本。

saveAsSequenceFile(path): 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统

countByKey():针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数

foreachPartition(func):在数据集的每一个分区上,运行函数 func

2.Spark的缓存cache() ,persist()

他们都是用于将一个RDD进行缓存的

cache()
区别cache()的缓存机制为MEMORY_AND_DISK。即RDD的数据直接以Java对象的形式存储一份于JVM的内存中,如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取。

persist()
persist的默认缓存机制为级别设置,storageLevel=StorageLevel.MEMORY_AND_DISK

createOrReplaceTempView()

这个用来创建临时的视图,如果存在相同的视图就替换,用法如下

//这里面写对应的SQL
Dataset<Row> result = SparkUtil.getInstance().getSparkSession()
                .sql()
//将sql的执行结果用视图的方式写入Spark缓存
result.persist(StorageLevel.MEMORY_AND_DISK()).createOrReplaceTempView("tmp_xdr_volte_sip_view");
  • 1
  • 2
  • 3
  • 4
  • 5

总结:

RDD 持久化/缓存的目的是为了提高后续操作的速度
缓存的级别有很多,默认只存在内存中,开发中使用 memory_and_disk
只有执行 action 操作的时候才会真正将 RDD 数据进行持久化/缓存
实际开发中如果某一个 RDD 后续会被频繁的使用,可以将该 RDD 进行持久化/缓存

3.RDD 容错机制Checkpoint

Checkpoint 的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在 HDFS 上,这就天然的借助了 HDFS 天生的高容错、高可靠来实现数据最大程度上的安全,实现了 RDD 的容错和高可用。

开发中如何保证数据的安全性性及读取效率:可以对频繁使用且重要的数据,先做缓存/持久化,再做 checkpint 操作。

持久化和 Checkpoint 的区别:

位置:Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存–实验中) Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。
生命周期:Cache 和 Persist 的 RDD 会在程序结束后会被清除或者手动调用 unpersist 方法 Checkpoint 的 RDD 在程序结束后依然存在,不会被删除。

4. Spark SQL

4.1Spark SQL 数据抽象

① DataFrame:DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格,带有 Schema 元信息(可以理解为数据库的列名和类型)。DataFrame = RDD + 泛型 + SQL 的操作 + 优化

② DataSet:DataSet是DataFrame的进一步发展,它比RDD保存了更多的描述信息,概念上等同于关系型数据库中的二维表,它保存了类型信息,是强类型的,提供了编译时类型检查。调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!DataFrame = Dateset[Row]

4.2 Spark SQL从Mysql读写数据

从mysql读

val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
  • 1
  • 2
  • 3
  • 4
  • 5

写入mysql

val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
  • 1
  • 2
  • 3
  • 4
  • 5

写入hdfs的分区目录下parquet后面就是对应的hdfs目录

        resDataSet.repartition(partitionSelect()).write().mode(SaveMode.Overwrite).parquet(PathUtils.pathCombine(
                ConfigHelper.getOutputRootPath(),
                "prov_id=" + ConfigHelper.getProvinceId(),
                "day_id=" + ConfigHelper.getDate(),
                "hour_id=" + ConfigHelper.getHour()
        ));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/955493
推荐阅读
相关标签
  

闽ICP备14008679号