当前位置:   article > 正文

Spark RDD使用教程_头歌实验 spark rdd操作数据库

头歌实验 spark rdd操作数据库

1、创建RDD的三种方式

Spark提供三种创建RDD方式:集合、本地文件、HDFS文件。详细可以查看RDDpair RDD文档
使用例子SparkRdd.java

ppackage com.penngo.rdd;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 *
 */
public class SparkRdd {
    public static void main(String[] args) {
        //windows下调试spark需要使用https://github.com/steveloughran/winutils
        System.setProperty("hadoop.home.dir", "D:\\hadoop\\hadoop-3.3.1");
        System.setProperty("HADOOP_USER_NAME", "root");

        SparkSession spark = SparkSession
                .builder()
                .appName("List2Rdd")
                .master("local[*]")
                .getOrCreate();
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

        // 集合转成RDD
        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5,1,2);
        JavaRDD<Integer> rdd1 = sc.parallelize(data);
        int sum1 = rdd1.reduce((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2);
        System.out.println("sum1============"+sum1);

        // 读取本地文件/hadoop文件转成RDD
        String path = "D:\\project\\data.txt";
        //path = "hdfs://testspark:9000/data.txt"
        JavaRDD<String> rddLine = sc.textFile(path, 2);

        JavaRDD<Integer> rdd2 =  rddLine.flatMap((FlatMapFunction<String, Integer>) s -> {
            String[] strs = s.split(",");
            List<Integer> list = new ArrayList<>();
            for(String str:strs){
                list.add(Integer.valueOf(str.trim()));
            }
            return list.iterator();
        });

        int sum2 = rdd2.reduce((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2);
        System.out.println("sum2============"+sum2);

        JavaPairRDD<String, Integer> pairs = rdd2.mapToPair(s -> new Tuple2("num_" + s, 1));
        JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
        counts.foreach(tp2->{
            System.out.println(tp2._1 + "=" + tp2._2);
        });
        spark.stop();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

2、Spark对RDD的操作

Spark对RDD的操作可以整体分为两类:
Transformation(转换):表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等。
Action(执行)表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序

2.1、Transformations(转换)

Transformation(转换)

Meaning(含义)

map(func)

返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。

filter(func)

返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成。

flatMap(func)与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq 而不是一个单独的 item
mapPartitions(func)与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U> 类型。
mapPartitionsWithIndex(func)与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型。
sample(withReplacement, fraction, seed)样本数据,设置是否放回(withReplacement)、采样的百分比(fraction)、使用指定的随机数生成器的种子(seed)。
union(otherDataset)返回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集
intersection(otherDataset)

返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的

distinct([numTasks]))返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素。
groupByKey([numTasks])

在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset

注意 : 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好。
注意 : 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数。

reduceByKey(func, [numTasks])

在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset,它的值会针对每一个 key 使用指定的 reduce 函数 func 来聚合,它必须为 (V,V) => V 类型。像 groupByKey 一样,可通过第二个可选参数来配置 reduce 任务的数量。

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) pairs 的 dataset,它的值会针对每一个 key 使用指定的 combine 函数和一个中间的 “zero” 值来聚合,它必须为 (V,V) => V 类型。为了避免不必要的配置,可以使用一个不同与 input value 类型的 aggregated value 类型。
sortByKey([ascending], [numTasks])在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys 升序或降序的 (K, V) pairs  dataset。
join(otherDataset, [numTasks])在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin,rightOuterJoin fullOuterJoin 来实现。
cogroup(otherDataset, [numTasks])在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples 的 dataset。这个操作也调用了 groupWith
cartesian(otherDataset)在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。
pipe(command, [envVars])通过使用 shell 命令来将每个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),并且 lines(行)输出到它的标准输出(stdout)被作为一个字符串型 RDD 的 string 返回。
coalesce(numPartitions)

Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的。

repartition(numPartitions)Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀。该操作总是通过网络来 shuffles 所有的数据。
repartitionAndSortWithinPartitions(partitioner)根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行。

2.2、Actions(动作)

Action

意思

reduce(func)使用函数 func 聚合数据集(dataset)中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative )和关联(associative)的,这样才能保证它可以被并行地正确计算。
collect()在驱动程序中,以一个数组的形式返回数据集的所有元素。这在返回足够小(sufficiently small)的数据子集的过滤器(filter)或其他操作(other operation)之后通常是有用的。
count()返回数据集中元素的个数。
first()返回数据集中的第一个元素(类似于 take(1))。
take(n)将数据集中的前 n 个元素作为一个数组返回。
takeSample(withReplacementnum[seed])对一个数据集随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。
takeOrdered(n, [ordering])返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素。
saveAsTextFile(path)将数据集中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。

saveAsSequenceFile(path
(Java and Scala)

将数据集中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 IntDoubleString 等等)。

saveAsObjectFile(path
(Java and Scala)

使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载。
countByKey()仅适用于(K,V)类型的 RDD 。返回具有每个 key 的计数的 (K , Int)对 的 hashmap
foreach(func)对数据集中每个元素运行函数 func 。这通常用于副作用(side effects),例如更新一个累加器(Accumulator)或与外部存储系统(external storage systems)进行交互。注意:修改除 foreach() 之外的累加器以外的变量(variables)可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 理解闭包(Understanding closures) 部分。

参考自官方文档

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/816306
推荐阅读
相关标签
  

闽ICP备14008679号