val rdd = sc.parallelize(list)//将相同key的值聚合到一起scala> val rdd1 = rdd.reduceByKey((a,b)=>a+b)//查看结果scala> rdd1.collect_spark算子练习">
当前位置:   article > 正文

SparkRDD算子练习_spark算子练习

spark算子练习

练习一:

val list = List(("zhangsan",85),("zhangsan",90),("zhangsan",76),("lisi",80),("lisi",75),("lisi",89))

要求:

1. 创建对应的RDD,命名为rdd

2. 使用map算子,将rdd的数据进行转换操作

      输出每个同学的平均分

  1. scala> val list = List(("zhangsan",85),("zhangsan",90),("zhangsan",76),("lisi",80),("lisi",75),("lisi",89))
  2. scala> val rdd = sc.parallelize(list)
  3. //将相同key的值聚合到一起
  4. scala> val rdd1 = rdd.reduceByKey((a,b)=>a+b)
  5. //查看结果
  6. scala> rdd1.collect()
  7. res5: Array[(String, Int)] = Array((zhangsan,251), (lisi,244))
  8. //让rdd1的每个元素做的输出x._1第一个元素不变,x._2第二个元素做除3的操作
  9. scala> val rdd2 = rdd1.map(x=>(x._1,x._2/3))
  10. //查看结果
  11. scala> rdd2.collect()
  12. res6: Array[(String, Int)] = Array((zhangsan,83), (lisi,81))

练习二:

val arr = Array(1,2,3,4,5)

要求:

1. 创建对应的RDD,命名为rdd

2. 使用map算子,将rdd的数据进行转换操作

     分别使用count()、first()、take()、reduce()、foreach()等方法输出

  1. scala> val arrRdd = sc.parallelize(Array(1,2,3,4,5))
  2. //计数
  3. scala> arrRdd.count()
  4. res7: Long = 5
  5. //返回第一个元素
  6. scala> arrRdd.first()
  7. res8: Int = 1
  8. //返回前三个元素
  9. scala> arrRdd.take(3)
  10. res9: Array[Int] = Array(1, 2, 3)
  11. //做聚合统计
  12. scala> arrRdd.reduce((a,b)=>a+b)
  13. res10: Int = 15
  14. //让arrRdd中每个元素做println()的输出操作
  15. scala> arrRdd.foreach(x=>println(x))
  16. 1
  17. 2
  18. 3
  19. 4
  20. 5
  21. //简化
  22. scala> arrRdd.foreach(println)
  23. 1
  24. 2
  25. 3
  26. 4
  27. 5

练习三:

val arr = Array(88,85,90)

要求:
1. 创建对应的RDD,命名为rdd1

2. 使用map算子,将rdd1的数据进行转换操作

    使用zip()方法,输出的格式<--zhangsan,88-->  <--lisi,85-->...

  1. scala> val rdd1 = sc.parallelize(Array(88,85,90))
  2. scala> val rdd2 = sc.parallelize(Array("zhangsan","lisi","wangwu"))
  3. //做拉链操作,一个rdd1中元素对应一个rdd2中元素返回新的rdd
  4. scala> val rdd3 = rdd1.zip(rdd2)
  5. //查看结果
  6. scala> rdd3.collect()
  7. res13: Array[(Int, String)] = Array((88,zhangsan), (85,lisi), (90,wangwu))
  8. scala> val rdd4 = rdd2.zip(rdd1)
  9. scala> rdd4.collect()
  10. res14: Array[(String, Int)] = Array((zhangsan,88), (lisi,85), (wangwu,90))

练习四:、

val list = List("dog","an","cat","an","cat")

要求:
1. 创建对应的RDD,命名为rdd

2. 使用map算子,将rdd的数据进行转换操作

    先输出rdd中每个元素的长度,再使用zip()方法

    最后去重

   输出格式:<--dog,3-->  <--an,2-->...

  1. val rdd = sc.parallelize(List("dog","an","cat","an","cat"))
  2. //第一步:通过获取rdd中每个元素的长度创建新的rdd1
  3. scala> val rdd1 = rdd.map(_.length)
  4. scala> rdd1.collect()
  5. res16: Array[Int] = Array(3, 2, 3, 2, 3)
  6. //第二步:通过zip把rdd1和rdd组合创建rdd2
  7. scala> val rdd2 = rdd.zip(rdd1)
  8. scala> rdd2.collect()
  9. res17: Array[(String, Int)] = Array((dog,3), (an,2), (cat,3), (an,2), (cat,3))
  10. //第三步:去重
  11. scala> val rdd3 = rdd2.distinct()
  12. //第四步:输出结果
  13. scala> rdd3.collect()
  14. res18: Array[(String, Int)] = Array((an,2), (dog,3), (cat,3))
  15. scala> rdd3.foreach(println)
  16. (an,2)
  17. (dog,3)
  18. (cat,3)

练习五:

val list = List(1,2,3,4,5,6)
要求:
1. 创建对应的RDD,命名为rdd

2. 使用map算子,将rdd的数据进行转换操作
 规则如下:
*      偶数转换成该数的平方

*      奇数转换成该数的立方

  1. scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
  2. //取出偶数元素
  3. scala> val rdd1 = rdd.filter(_ % 2 == 0)
  4. scala> rdd1.collect()
  5. res19: Array[Int] = Array(2, 4, 6)
  6. //换成平方
  7. scala> val rdd2 = rdd1.map(x=>x*x)
  8. scala> rdd2.collect()
  9. res20: Array[Int] = Array(4, 16, 36)
  10. //取出奇数元素
  11. scala> val rdd3 = rdd.filter(_ % 2 == 1)
  12. scala> rdd3.collect()
  13. res21: Array[Int] = Array(1, 3, 5)
  14. //换成立方
  15. scala> val rdd4 = rdd3.map(x=>x*x*x)
  16. scala> rdd4.collect()
  17. res22: Array[Int] = Array(1, 27, 125)
  18. //简化:
  19. scala> val rdd1 = rdd.map(x=>if(x%2==0){x*x}else{x*x*x})
  20. //输出
  21. scala> rdd1.foreach(println)
  22. 1
  23. 4
  24. 27
  25. 16
  26. 125
  27. 36

练习六:

有一个数组,数组元素为"dog", "salmon", "salmon", "rat", "elephant"
要求:
1. 创建对应的RDD

2. 使用map算子,将rdd的数据进行转换操作
         * 规则如下:
         *      将字符串与该字符串的长度组合成一个元组,例如:dog  -->  (dog,3),salmon   -->  (salmon,6)

  1. scala> val rdd = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"))
  2. //第一步:通过获取rdd中每个元素的长度创建新的rdd1
  3. scala> val rdd1 = rdd.map(_.length)
  4. scala> rdd1.collect()
  5. res23: Array[Int] = Array(3, 6, 6, 3, 8)
  6. //第二步:通过zip把rdd1和rdd组合创建rdd2
  7. scala> val rdd2 = rdd.zip(rdd1)
  8. scala> rdd2.collect()
  9. res24: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
  10. //第三步:去重
  11. scala> val rdd3 = rdd2.distinct()
  12. //第四步:输出结果
  13. scala> rdd3.collect()
  14. res25: Array[(String, Int)] = Array((salmon,6), (rat,3), (dog,3), (elephant,8))
  15. //简化
  16. scala> val rdd1 = rdd.map(x=>(x,x.length()))
  17. //输出
  18. scala> rdd1.foreach(println)
  19. (dog,3)
  20. (salmon,6)
  21. (salmon,6)
  22. (rat,3)
  23. (elephant,8)

练习七:

有一个words.txt文件,内容如下:
hello,world,hello,spark
good,nice,good,do
要求:
将该文件上传到HDFS下/spark/test目录下,并创建RDD数据集,然后完成以下步骤:

  1. scala> val rdd = sc.textFile("/spark/test/word.txt")
  2. scala> rdd.collect()
  3. res0: Array[String] = Array(hello,world,hello,spark, good,nice,good,do)
  4. scala> rdd.count
  5. res13: Long = 2
  6. //第一步:对所给数据创建的rdd切割分词
  7. cala> val rdd1 = rdd.flatMap(x=>x.split(","))
  8. scala> rdd1.collect()
  9. res1: Array[String] = Array(hello, world, hello, spark, good, nice, good, do)
  10. scala> rdd1.count
  11. res15: Long = 8
  12. //第二步:每个单词计数为1
  13. scala> val rdd2 = rdd1.map(x=>(x,1))
  14. scala> rdd2.collect()
  15. res2: Array[(String, Int)] = Array((hello,1), (world,1), (hello,1), (spark,1), (good,1), (nice,1), (good,1), (do,1))
  16. //第三步:对相同单词个数进行累加
  17. scala> val rdd3 = rdd2.reduceByKey((a,b)=>a+b)
  18. scala> rdd3.collect()
  19. res3: Array[(String, Int)] = Array((spark,1), (do,1), (nice,1), (hello,2), (good,2), (world,1))
  20. //第四步:过滤出单词个数大于一个的
  21. scala> val rdd4 = rdd3.map(x=>(x._1,x._2>1))
  22. scala> rdd4.collect()
  23. res5: Array[(String, Boolean)] = Array((spark,false), (do,false), (nice,false), (hello,true), (good,true), (world,false))
  24. //简化
  25. scala> val rdd4 = rdd3.filter(x=>x._2>1)
  26. //第五步:输出结果
  27. scala> rdd4.foreach(println)
  28. (hello,2)
  29. (good,2)

练习八:

某商店上午卖出10本 spark 书籍,每本50元,4本 Hadoop 书籍,每本40元,下午卖出20本 spark 书籍,每本40元,10本 Hadoop 书籍,每本30元。
现要求求出这两本书这一天销售的平均价格。
数据如下:
spark,10,50
spark,20,40
hadoop,4,40
hadoop,10,30

提示:List(("spark",(10,50)),("hadoop",(4,40)),("hadoop",(10,30)),("spark",(20,40)))

  1. 要求:
  2. //第一步:通过给定数据通过集合并行化创建rdd
  3. scala> val rdd = sc.parallelize(List(("spark",(10,50)),("hadoop",(4,40)),("hadoop",(10,30)),("spark",(20,40))))
  4. //第二步:求出不同书籍一天收入总和以及出售本数
  5. scala> val rdd1 = rdd.reduceByKey((a,b)=>(a._1*a._2+b._1*b._2,a._1+b._1))
  6. scala> rdd1.collect()
  7. res19: Array[(String, (Int, Int))] = Array((spark,(1300,30)), (hadoop,(460,14)))
  8. //第三步:求出每本平均售价
  9. scala> val rdd2 = rdd1.map(x=>(x._1,x._2._1/x._2._2))
  10. //输出结果
  11. scala> rdd2.foreach(println)
  12. (spark,43)
  13. (hadoop,32)

练习九:

List(("Bob","spark"),("Lily","hadoop"),("Candy","hive"),("Bob","hbase"),("Bob","hive"))

根据姓名对所学书籍分组

求出每个人的书籍本数

根据项目排序

输出结果

综合案例

有一份数据格式如下的文档:

日期,姓名,app,下载渠道,地区,版本号

  1. 1. 2017-08-14,Lily,Facebook,360 Shop,NewYork,v1.0
  2. 2. 2017-08-14,Bob,Facebook,Amazon Appstore,NewYork,v1.2
  3. 3. 2017-08-14,Lily,Facebook,360 Shop,Washington,v1.2
  4. 4. 2017-08-14,Lily,Facebook,Google Play Store,Washington,v2.0
  5. 5. 2017-08-14,Candy,YouTube,app store,Chicago,v1.8
  6. 6. 2017-08-14,Lily,Facebook,Google Play Store,Washington,v2.0
  7. 7. 2017-08-14,Candy,YouTube,app store,Chicago,v1.9
  8. 8. 2017-08-15,Candy,YouTube,app store,Chicago,v2.0
  9. 9. 2017-08-15,Candy,YouTube,app store,Chicago,v2.3
  10. 10. 2017-08-15,Lily,Facebook,360 Shop,NewYork,v2.0
  11. 11. 2017-08-15,Bob,Facebook,Amazon Appstore,NewYork,v1.2
  12. 12. 2017-08-15,Bob,Facebook,Amazon Appstore,NewYork,v1.5
  13. 13. 2017-08-15,Candy,YouTube,app store,Chicago,v2.9

需求: 不考虑地区,列出版本升级情况。

结果格式: 日期,姓名,app,下载渠道,升级前版本,升级后版本。

例: 数据: 

  1. 1. 2017-08-14,Lily,Facebook,360 Shop,NewYork,v1.0
  2. 2. 2017-08-14,Lily,Facebook,360 Shop,Washington,v1.2
  3. 3. 2017-08-14,Lily,Facebook,360 Shop,NewYork,v2.0

结果:

  1. 1. (2017-08-14,Lily,Facebook,360 Shop,v1.0,v1.2
  2. 2. (2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0

要求:

//根据需求,去除城市字段

//按key分组,key是除城市字段和版本号字段以外的所有字段,value是版本号

//过滤版本号重复的(例:(v2.0,v2.0))以及版本号只有一个的(例(v1.0))

//拆分重新组合(例:(key,(v2.0,v2.5,v3.0))拆分成(key,(v2.0,v2.5))(key,(v2.5,v3.0)))

//按需求整理输出格式(例:(2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0))

//执行foreach操作,打印出结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/819894
推荐阅读
相关标签
  

闽ICP备14008679号