当前位置:   article > 正文

Spark 性能调优——分布式计算_spark 分布式

spark 分布式

前言


分布式计算的精髓,在于如何把抽象的计算流图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行。今天这一讲,我们就来聊一聊,Spark 是如何实现分布式计算的。分布式计算的实现,离不开两个关键要素,一个是进程模型,另一个是分布式的环境部署。接下来,我们先去探讨 Spark 的进程模型,然后再来介绍 Spark 都有哪些分布式部署方式。

触发计算流程图


 函数


 

  1. ##统计单词的次数
  2. import org.apache.spark.rdd.RDD
  3. // 这里的下划线"_"是占位符,代表数据文件的根目录,hdfs的目录地址
  4. val rootPath: String = "/user/hadoop/wikiOfSpark.txt"
  5. val file: String = s"${rootPath}"
  6. // 读取文件内容
  7. val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
  8. // 以行为单位做分词
  9. val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
  10. val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))
  11. // 把RDD元素转换为(Key,Value)的形式
  12. val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
  13. // 按照单词做分组计数
  14. val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
  15. // 打印词频最高的5个词汇
  16. wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
  17. ##########################
  18. //统计相邻单词共现的次数
  19. 假设,我们再次改变 Word Count 的计算逻辑,由原来统计单词的计数,改为统计相邻单词共现的次数。
  20. import org.apache.spark.rdd.RDD
  21. // 这里的下划线"_"是占位符,代表数据文件的根目录,hdfs的目录地址
  22. val rootPath: String = "/user/hadoop/wikiOfSpark.txt"
  23. val file: String = s"${rootPath}"
  24. // 读取文件内容
  25. val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
  26. // 以行为单位提取相邻单词
  27. val wordPairRDD: RDD[String] = lineRDD.flatMap( line => {
  28. // 将行转换为单词数组
  29. val words: Array[String] = line.split(" ")
  30. // 将单个单词数组,转换为相邻单词数组
  31. for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i+1)
  32. })
  33. val cleanWordRDD: RDD[String] = wordPairRDD.filter(word => !word.equals(""))
  34. // 把RDD元素转换为(Key,Value)的形式
  35. val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
  36. // 按照单词做分组计数
  37. val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
  38. // 打印词频最高的5个词汇
  39. wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
  40. ##对原来单词的计数,改为对单词的哈希值计数,在这种情况下。我们代码实现需要做哪些改动。
  41. import org.apache.spark.rdd.RDD
  42. import java.security.MessageDigest
  43. // 这里的下划线"_"是占位符,代表数据文件的根目录,hdfs的目录地址
  44. val rootPath: String = "/user/hadoop/wikiOfSpark.txt"
  45. val file: String = s"${rootPath}"
  46. // 读取文件内容
  47. val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
  48. // 以行为单位做分词
  49. val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
  50. val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))
  51. // 把普通RDD转换为Paired RDD
  52. val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
  53. // 获取MD5对象实例
  54. val md5 = MessageDigest.getInstance("MD5")
  55. // 使用MD5计算哈希值
  56. val hash = md5.digest(word.getBytes).mkString
  57. // 返回哈希值与数字1的Pair
  58. (hash, 1)
  59. }
  60. // 按照单词做分组计数
  61. val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
  62. // 打印词频最高的5个词汇
  63. wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

 

  1. import org.apache.spark.sql.expressions.Window
  2. import org.apache.spark.sql.functions._
  3. // 创建表
  4. case class SiteViews(site_id: String, date: String, page_view: Int)
  5. val siteViews = Seq(
  6. SiteViews("a", "2021-05-20", 10),
  7. SiteViews("a", "2021-05-21", 11),
  8. SiteViews("a", "2021-05-22", 12),
  9. SiteViews("a", "2021-05-23", 12),
  10. SiteViews("a", "2021-05-24", 13),
  11. SiteViews("a", "2021-05-25", 14),
  12. SiteViews("a", "2021-05-26", 15),
  13. SiteViews("b", "2021-05-20", 21),
  14. SiteViews("b", "2021-05-21", 22),
  15. SiteViews("b", "2021-05-22", 22),
  16. SiteViews("b", "2021-05-23", 22),
  17. SiteViews("b", "2021-05-24", 23),
  18. SiteViews("b", "2021-05-25", 23),
  19. SiteViews("b", "2021-05-26", 25)
  20. ).toDF()
  21. //
  22. Window.partitionBy("column name"|column)
  23. // orderBy的语法
  24. Window.orderBy("column name"|column)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/695244
推荐阅读
相关标签
  

闽ICP备14008679号