赞
踩
RDD是Spark编程的核心,在进行Spark编程时,首要任务是创建一个初始的RDD这样就相当于设置了Spark应用程序的输入源数据然后在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD。
Spark提供三种创建RDD方式:集合、本地文件、HDFS文件
如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了。
调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)
引入依赖:
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>2.4.3</version>
- <!-- <scope>provided</scope>-->
- </dependency>
Scala代码:
- package com.sanqian.scala
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object CreateRddByArrayScala {
- def main(args: Array[String]): Unit = {
- //创建SparkContext
- val conf = new SparkConf()
- conf.setAppName("CreateRddByArray").setMaster("local")
- val sc = new SparkContext(conf)
- //创建集合
- val arr = Array(1, 2, 3, 4, 5)
- //基于集合创建RDD
- val rdd = sc.parallelize(arr)
- //对集合中的元素求和
- val sum = rdd.reduce(_ + _)
-
- //注意:这行println代码是在driver进程中执行的
- println(sum)
- }
- }
注意:val arr = Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的,这些代码不会并行执行,parallelize还有reduce之类的操作是在worker节点中执行的
Java代码:
- package com.sanqian.java;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function2;
-
- import java.util.Arrays;
- import java.util.List;
-
- public class CreateRddByArrayJava {
- public static void main(String[] args) {
- //创建JavaSparkContext
- SparkConf conf = new SparkConf();
- conf.setAppName("CreateRddByArrayJava").setMaster("local");
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- //创建集合
- List<Integer> arr = Arrays.asList(1, 2, 3, 4, 5);
- JavaRDD<Integer> rdd = sc.parallelize(arr);
- Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
- return integer + integer2;
- }
- });
-
- System.out.println(sum);
-
- sc.stop();
- }
- }
Scala代码:
- package com.sanqian.scala
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- object CreateRddByFileScala {
- def main(args: Array[String]): Unit = {
- //创建SparkContext
- val conf = new SparkConf()
- conf.setAppName("CreateRddByArray").setMaster("local")
- val sc = new SparkContext(conf)
-
- var path = "D:\\data\\words.txt"
- path = "hdfs://bigdata01:9000/words.txt"
- val rdd = sc.textFile(path)
- val length = rdd.map(_.length).reduce(_ + _)
- println(length)
- sc.stop()
- }
- }
Java代码:
- package com.sanqian.java;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.Function2;
-
- import java.util.Arrays;
- import java.util.List;
-
- public class CreateRddByFileJava {
- public static void main(String[] args) {
- //创建JavaSparkContext
- SparkConf conf = new SparkConf();
- conf.setAppName("CreateRddByArrayJava").setMaster("local");
- JavaSparkContext sc = new JavaSparkContext(conf);
-
- //创建集合
- String path = "D:\\data\\words.txt";
- path = "hdfs://bigdata01:9000/words.txt";
- JavaRDD<String> rdd = sc.textFile(path, 2);
-
- //获取每一行数据的长度
- JavaRDD<Integer> numRDD = rdd.map(new Function<String, Integer>() {
- @Override
- public Integer call(String s) throws Exception {
- return s.length();
- }
- });
- //计算文件内数据的总长度
- Integer sum = numRDD.reduce(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
- return integer + integer2;
- }
- });
-
- System.out.println(sum);
-
- sc.stop();
- }
- }
Spark对RDD的操作可以整体分为两类:
不管是Transformation操作还是Action操作,一般会把它们称之为算子,例如:map算子,reduce算子。
其中Transformation算子有一个特性:lazy,lazy特性在这里指的是,如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行。也就是说,transformation是不会触发spark任务的执行,它们只是记录了对RDD所做的操作,不会执行。只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。
Action的特性:执行Action操作才会触发一个Spark任务的运行,从而触发这个Action之前所有的
Transformation的执行。
那下面我们先来看一下Spark中的Transformation算子
先来看一下官方文档,进入2.4.3的文档界面
算子 | 介绍 |
map | 将RDD中的每个元素进行处理,一进一出 |
filter | 对RDD中每个元素进行判断,返回true则保留 |
flatMap | 与map类似,但是每个元素都可以返回一个或多个新元素 |
groupByKey | 根据key进行分组,每个key对应一个Iterable<value> |
reduceByKey | 对每个相同key对应的value进行reduce操作 |
sortByKey | 对每个相同key对应的value进行排序操作(全局排序) |
join | 对两个包含<key,value>对的RDD进行join操作 |
distinct | 对RDD中的元素进行全局去重 |
4. Scala代码:
- package com.sanqian.scala
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- /**
- * 需求:transformation 实战
- * map: 对集合中的每个元素乘以2
- * filter: 过滤出集合中的偶数
- * flatMap: 将行拆分为单词
- * groupByKey:对每个大区的主播进行分组
- * reduceByKey: 统计每个大区的主播数量
- * sortByKey: 对主播的音浪收入排序
- * join: 打印每个主播的大区信息和音浪收入
- * distinct: 统计当天开播的主播数量
- */
- object TransformationOpScala {
-
-
- def main(args: Array[String]): Unit = {
- val sc = getSparkContext
- //map: 对集合中的每个元素乘以2
- // mapOp(sc)
- //filter: 过滤出集合中的偶数
- // filterOp(sc)
- //flatMap: 将行拆分为单词
- // flatMapOp(sc)
-
- //groupByKey:对每个大区的主播进行分组
- // groupByKeyOp2(sc)
- //reduceByKey: 统计每个大区的主播数量
- // reduceBykeyOp(sc)
- //sortByKey: 对主播的音浪收入排序
- // sortByKeyOp(sc)
- //join: 打印每个主播的大区信息和音浪收入
- // joinOp(sc)
- //distinct: 统计当天开播的大区信息
- distinctOp(sc)
- }
- def distinctOp(sc: SparkContext): Unit = {
- val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
- //由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息
- rdd.map(_._2).distinct().foreach(println(_))
- }
-
- def joinOp(sc: SparkContext): Unit = {
- val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
- val rdd2 = sc.parallelize(Array((150001, 400), (150002, 200), (15003, 300), (15004, 100)))
-
- rdd.join(rdd2).foreach(tup => {
- //用户id
- val uid = tup._1
- val area_gold = tup._2
- println(uid + "\t" + area_gold._1 +"\t" + area_gold._2)
- })
- }
-
- def sortByKeyOp(sc: SparkContext): Unit = {
- val rdd = sc.parallelize(Array((150001, 400), (150002, 200), (15003, 300), (15004, 100)))
- // rdd.map(tup => (tup._2, tup._1)).sortByKey(ascending = false)
- // .foreach(println(_))
- //sortBy的使用,可以动态指定排序字段比较灵活
- rdd.sortBy(_._2, ascending = false).foreach(println(_))
- }
- def reduceBykeyOp(sc: SparkContext): Unit = {
- val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
- rdd.map(tup => (tup._2, 1)).reduceByKey(_ + _).foreach(println(_))
- }
-
- def groupByKeyOp(sc: SparkContext): Unit = {
- val rdd = sc.parallelize(Array((150001, "US"), (150002, "CN"), (15003, "CN"), (15004, "IN")))
- rdd.map(tup => (tup._2, tup._1)).groupByKey().foreach(tup => {
- //获取大区信息
- val area = tup._1
- print(area + ":")
- //获取同一个大区对应的所有用户id
- val it = tup._2
- for (uid <- it) {
- print(uid + " ")
- }
- println()
- })
- }
- // 如果tuple中的数据列超过了2列怎么办?
- // 把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用tuple2包装一下
- //注意:如果你的数据结构比较复杂,可以在执行每一个算子之后都调用foreach打印一下,确认数据的格式
- def groupByKeyOp2(sc: SparkContext): Unit = {
- val rdd = sc.parallelize(Array((150001, "US", "male"), (150002, "CN", "female"), (15003, "CN", "male"), (15004, "IN", "male")))
- rdd.map(tup => (tup._2, (tup._1, tup._3))).groupByKey().foreach(tup => {
- //获取大区信息
- val area = tup._1
- print(area + ":")
- //获取同一个大区对应的所有用户id
- val it = tup._2
- for ((uid, sex) <- it) {
- print("<" + uid + "," + sex + "> ")
- }
- println()
- })
- }
-
- def mapOp(sc: SparkContext): Unit = {
- val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
- rdd.map(_ * 2).foreach(println(_))
- }
-
- def flatMapOp(sc: SparkContext): Unit = {
- val rdd = sc.parallelize(Array("good good study", "day day up"))
- rdd.flatMap(_.split(" ")).foreach(println(_))
- }
-
- def filterOp(sc: SparkContext): Unit = {
- val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
- //满足条件的保存下来
- rdd.filter(_ % 2 == 0).foreach(println(_))
- }
-
- def getSparkContext = {
- //创建SparkContext
- val conf = new SparkConf()
- conf.setAppName("TransformationOpScala").setMaster("local")
- new SparkContext(conf)
- }
- }
5. Java代码:
- package com.sanqian.java;
-
- import jdk.nashorn.internal.ir.FunctionCall;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.*;
- import scala.Tuple2;
- import scala.Tuple3;
-
- import java.util.Arrays;
- import java.util.Iterator;
-
- public class TransformationOpJava {
- public static void main(String[] args) {
- JavaSparkContext sc = getSparkContext();
- //map: 对集合中的每个元素乘以2
- // mapOp(sc);
- //filter: 过滤出集合中的偶数
- // filterOp(sc);
- //flatMap: 将行拆分为单词
- // flatMapOp(sc);
- //groupByKey:对每个大区的主播进行分组
- // groupByKeyOp(sc);
- // groupByKeyOp2(sc);
- //reduceByKey: 统计每个大区的主播数量
- // reduceByKeyOp(sc);
- //sortByKey: 对主播的音浪收入排序
- // sortByKeyOp(sc);
- //join: 打印每个主播的大区信息和音浪收入
- // joinOp(sc);
- //distinct: 统计每个大区的主播
- distinctOp(sc);
-
- }
-
- private static void distinctOp(JavaSparkContext sc) {
- Tuple2<Integer, String> t5 = new Tuple2<Integer, String>(150001, "US");
- Tuple2<Integer, String> t6 = new Tuple2<Integer, String>(150002, "CN");
- Tuple2<Integer, String> t7 = new Tuple2<Integer, String>(150003, "CN");
- Tuple2<Integer, String> t8 = new Tuple2<Integer, String>(150004, "IN");
- JavaRDD<Tuple2<Integer, String>> rdd2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));
- rdd2.map(new Function<Tuple2<Integer, String>, String>() {
- @Override
- public String call(Tuple2<Integer, String> v1) throws Exception {
- return v1._2;
- }
- }).distinct().foreach(new VoidFunction<String>() {
- @Override
- public void call(String s) throws Exception {
- System.out.println(s);
- }
- });
- }
-
- private static void joinOp(JavaSparkContext sc) {
- Tuple2<Integer, Integer> t1 = new Tuple2<Integer, Integer>(150002, 200);
- Tuple2<Integer, Integer> t3 = new Tuple2<Integer, Integer>(150001, 400);
- Tuple2<Integer, Integer> t2 = new Tuple2<Integer, Integer>(150003, 300);
- Tuple2<Integer, Integer> t4 = new Tuple2<Integer, Integer>(150004, 100);
- JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
-
- Tuple2<Integer, String> t5 = new Tuple2<Integer, String>(150001, "US");
- Tuple2<Integer, String> t6 = new Tuple2<Integer, String>(150002, "CN");
- Tuple2<Integer, String> t7 = new Tuple2<Integer, String>(150003, "CN");
- Tuple2<Integer, String> t8 = new Tuple2<Integer, String>(150004, "IN");
- JavaRDD<Tuple2<Integer, String>> rdd2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));
-
- JavaPairRDD<Integer, Integer> rddPair = rdd.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup) throws Exception {
- return new Tuple2<Integer, Integer>(tup._1, tup._2);
- }
- });
- JavaPairRDD<Integer, String> rdd2Pair = rdd2.mapToPair(new PairFunction<Tuple2<Integer, String>, Integer, String>() {
- @Override
- public Tuple2<Integer, String> call(Tuple2<Integer, String> tup) throws Exception {
- return new Tuple2<Integer, String>(tup._1, tup._2);
- }
- });
-
- rddPair.join(rdd2Pair).foreach(new VoidFunction<Tuple2<Integer, Tuple2<Integer, String>>>() {
- @Override
- public void call(Tuple2<Integer, Tuple2<Integer, String>> tuple) throws Exception {
- //主播编号
- Integer uid = tuple._1;
- //大区和音浪收入信息
- Tuple2<Integer, String> tu = tuple._2();
- System.out.println(uid + "\t" + tu._1 + "\t" + tu._2);
- }
- });
-
- }
-
- private static void sortByKeyOp(JavaSparkContext sc) {
- //(150001, 400), (150002, 200), (15003, 300), (15004, 100)
- Tuple2<Integer, Integer> t1 = new Tuple2<Integer, Integer>(150002, 200);
- Tuple2<Integer, Integer> t3 = new Tuple2<Integer, Integer>(150001, 400);
- Tuple2<Integer, Integer> t2 = new Tuple2<Integer, Integer>(150003, 300);
- Tuple2<Integer, Integer> t4 = new Tuple2<Integer, Integer>(150004, 100);
- JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
-
-
-
- /*
- rdd.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup) throws Exception {
- return new Tuple2<Integer, Integer>(tup._2, tup._1);
- }
- }).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
- @Override
- public void call(Tuple2<Integer, Integer> tup) throws Exception {
- System.out.println(tup);
- }
- });
- */
- rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<Integer, Integer> v1) throws Exception {
- return v1._2();
- }
- }, false, 1).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
- @Override
- public void call(Tuple2<Integer, Integer> tup) throws Exception {
- System.out.println(tup);
- }
- });
- }
-
-
- private static void reduceByKeyOp(JavaSparkContext sc) {
- Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
- Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
- Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
- Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
- JavaRDD<Tuple2<Integer, String>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
-
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {
- return new Tuple2<String, Integer>(tup._2, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
- return integer + integer2;
- }
- }).foreach(new VoidFunction<Tuple2<String, Integer>>() {
- @Override
- public void call(Tuple2<String, Integer> tuple) throws Exception {
- System.out.println(tuple);
- }
- });
- }
-
- private static void groupByKeyOp2(JavaSparkContext sc) {
- Tuple3<Integer, String, String> t1 = new Tuple3<Integer, String, String>(150001, "US", "male");
- Tuple3<Integer, String, String> t2 = new Tuple3<Integer, String, String>(150002, "CN", "female");
- Tuple3<Integer, String, String> t3 = new Tuple3<Integer, String, String>(150003, "CN", "female");
- Tuple3<Integer, String, String> t4 = new Tuple3<Integer, String, String>(150004, "IN", "male");
- JavaRDD<Tuple3<Integer, String, String>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
-
- rdd.mapToPair(new PairFunction<Tuple3<Integer, String, String>, String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Tuple2<String, Integer>> call(Tuple3<Integer, String, String> tup) throws Exception {
- return new Tuple2<String, Tuple2<String, Integer>>(tup._2(), new Tuple2<String, Integer>(tup._3(), tup._1()));
- }
- }).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>>() {
- @Override
- public void call(Tuple2<String, Iterable<Tuple2<String, Integer>>> tup) throws Exception {
- //大区信息
- String area = tup._1;
- System.out.print(area + ":");
- //获取同一个大区所有用户对应的性别信息
- Iterable<Tuple2<String, Integer>> it = tup._2;
- for (Tuple2<String, Integer> tu : it) {
- System.out.print("<" + tu._2 + "," + tu._1 + ">");
- }
- System.out.println();
-
- }
- });
- }
-
- private static void groupByKeyOp(JavaSparkContext sc) {
- Tuple2<Integer, String> t1 = new Tuple2<Integer, String>(150001, "US");
- Tuple2<Integer, String> t2 = new Tuple2<Integer, String>(150002, "CN");
- Tuple2<Integer, String> t3 = new Tuple2<Integer, String>(150003, "CN");
- Tuple2<Integer, String> t4 = new Tuple2<Integer, String>(150004, "IN");
- JavaRDD<Tuple2<Integer, String>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
- rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {
- return new Tuple2<String, Integer>(tup._2, tup._1);
- }
- }).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
- @Override
- public void call(Tuple2<String, Iterable<Integer>> tup) throws Exception {
- //获取大区信息
- String area = tup._1;
- System.out.print(area + ":");
- Iterable<Integer> it = tup._2;
- for (Integer uid : it) {
- System.out.print(uid + " ");
- }
- System.out.println();
- }
- });
- }
-
- private static void flatMapOp(JavaSparkContext sc) {
- JavaRDD<String> rdd = sc.parallelize(Arrays.asList("good good study", "day day up"));
- rdd.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String line) throws Exception {
- return Arrays.asList(line.split(" ")).iterator();
- }
- }).foreach(new VoidFunction<String>() {
- @Override
- public void call(String word) throws Exception {
- System.out.println(word);
- }
- });
- }
-
- private static void filterOp(JavaSparkContext sc) {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- rdd.filter(new Function<Integer, Boolean>() {
- @Override
- public Boolean call(Integer integer) throws Exception {
- return integer % 2 == 0;
- }
- }).foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer integer) throws Exception {
- System.out.println(integer);
- }
- });
- }
-
- private static void mapOp(JavaSparkContext sc) {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- rdd.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer integer) throws Exception {
- return integer * 2;
- }
- }).foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer integer) throws Exception {
- System.out.println(integer);
- }
- });
- }
-
- public static JavaSparkContext getSparkContext() {
- SparkConf conf = new SparkConf();
- conf.setAppName("TransformationOpJava").setMaster("local");
- return new JavaSparkContext(conf);
- }
-
-
- }
RDD Programming Guide - Spark 2.4.3 Documentation (apache.org)
算子 | 介绍 |
reduce | 将RDD中的所有元素进行聚合操作 |
collect | 将RDD中的所有元素拉取到本地客户端(Driver) |
take(n) | 获取RDD中前n个元素 |
count | 获取RDD元素总数 |
saveAsTextFile | 将RDD中元素保存在文件中,对每个元素调用toStrin |
countByKey | 对每个key对应的值进行count计数 |
foreach | 便利RDD中的每个元素 |
- package com.sanqian.scala
-
- import org.apache.spark.{SparkConf, SparkContext}
-
- /**
- * 需求:Action实战
- * reduce:聚合计算
- * collect:获取元素集合
- * take(n):获取前n个元素
- * count:获取元素总数
- * saveAsTextFile:保存文件
- * countByKey:统计相同的key出现多少次
- * foreach:迭代遍历元素
- * Created by
- */
- object ActionOpScala {
- def main(args: Array[String]): Unit = {
- val sc = getSparkContext
- //reduce:聚合计算
- // reduceOp(sc)
- //collect:获取元素集合
- // collectOp(sc)
- //take(n):获取前n个元素
- // takeOp(sc)
- //count:获取元素总数
- // countOp(sc)
- //saveAsTextFile:保存文件
- // saveAsTextFileOp(sc)
- //countByKey:统计相同的key出现多少次
- // countByKeyOp(sc)
- //foreach:迭代遍历元素
- // foreachOp(sc)
- }
- private def foreachOp(sc: SparkContext) = {
- val rdd = sc.parallelize(Array(1,2,3,4,5))
- rdd.foreach(println(_))
- }
-
- private def countByKeyOp(sc: SparkContext) = {
- val rdd = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))
- val res = rdd.countByKey()
- for((k, v) <- res){
- println(k + "," + v)
- }
- }
-
- private def saveAsTextFileOp(sc: SparkContext) = {
- val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
- //指定HDFS的路径信息即可,需要指定一个不存在的目录
- rdd.saveAsTextFile("hdfs://bigdata01:9000/out0104")
- }
-
- private def countOp(sc: SparkContext) = {
- val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
- //如果想要获取几条数据,查看一下数据格式,可以使用take(n)
- val count = rdd.count()
- print(count)
- }
-
-
- private def takeOp(sc: SparkContext) = {
- val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
- //如果想要获取几条数据,查看一下数据格式,可以使用take(n)
- val res = rdd.take(2)
- for (i <- res) {
- println(i)
- }
- }
-
- private def collectOp(sc: SparkContext) = {
- val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
- //注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程所在的节点
- //如果想要获取几条数据,查看一下数据格式,可以使用take(n)
- val res = rdd.collect()
- for (i <- res) {
- println(i)
- }
- }
-
- private def reduceOp(sc: SparkContext) = {
-
- val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
- val num = rdd.reduce(_ + _)
- println(num)
- }
-
- private def getSparkContext = {
- val conf = new SparkConf()
- conf.setAppName("ActionOpScala").setMaster("local[*]")
- new SparkContext(conf)
- }
- }
- package com.sanqian.java;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.api.java.function.VoidFunction;
- import scala.Tuple2;
- import sun.awt.windows.WPrinterJob;
-
- import java.util.Arrays;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- public class ActionOpJava {
- public static void main(String[] args) {
- JavaSparkContext sc = getSparkContext();
- //reduce:聚合计算
- // reduceOp(sc);
- //collect:获取元素集合
- // collectOp(sc);
- //take(n):获取前n个元素
- // takeOp(sc);
- //count:获取元素总数
- // countOp(sc);
- //saveAsTextFile:保存文件
- // saveAsTextFileOp(sc);
- //countByKey:统计相同的key出现多少次
- // countByKeyOp(sc);
- //foreach:迭代遍历元素
- foreachOp(sc);
-
- }
-
- public static void foreachOp(JavaSparkContext sc) {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- rdd.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer integer) throws Exception {
- System.out.println(integer);
- }
- });
- }
-
-
- public static void countByKeyOp(JavaSparkContext sc) {
- Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("A", 1001);
- Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("B", 1002);
- Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("A", 1003);
- Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("C", 1004);
- JavaRDD<Tuple2<String, Integer>> rdd = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
- Map<String, Long> res = rdd.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(Tuple2<String, Integer> tup) throws Exception {
- return new Tuple2<String, Integer>(tup._1, tup._2);
- }
- }).countByKey();
-
- for (Map.Entry<String, Long> entry : res.entrySet()) {
- System.out.println(entry.getKey() + "," + entry.getValue());
- }
-
- }
-
- public static void countOp(JavaSparkContext sc) {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- long num = rdd.count();
- System.out.println(num);
- }
-
- public static void saveAsTextFileOp(JavaSparkContext sc) {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- rdd.saveAsTextFile("hdfs://bigdata01:9000/output0104");
- }
-
- public static void takeOp(JavaSparkContext sc) {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- List<Integer> res = rdd.take(2);
- for (Integer i : res) {
- System.out.println(i);
- }
- }
-
- public static void collectOp(JavaSparkContext sc) {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- List<Integer> res = rdd.collect();
- for (Integer i : res) {
- System.out.println(i);
- }
- }
-
- public static void reduceOp(JavaSparkContext sc) {
- JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
- Integer num = rdd.reduce(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 + v2;
- }
- });
- System.out.println(num);
- }
-
- public static JavaSparkContext getSparkContext() {
- SparkConf conf = new SparkConf();
- conf.setAppName("ActionOpJava").setMaster("local[*]");
- return new JavaSparkContext(conf);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。