当前位置:   article > 正文

Spark编程进阶--期末总结(书第四章)_spark编程基础期末复习

spark编程基础期末复习

目录

一、Spark和Hadoop的区别

二、安装IDEA

2.1安装Scala

2.2 Scala下载

2.3 Scala插件 (版本要与IDEA版本保持一致,下载2019.2.3版本)的下载安装)

2.4 检测Scala插件是否在IDEA中已经安装成功

2.5 新建scala类文件编写代码

2.6 鼠标点击java文件夹,右键new--->Scala Class

2.7 准备好测试文件words.txt,将文件存放在scalaproject-->data-->input-->words.txt

三、编写本地运行的Spark程序

3.1 编写pom.xml文件

3.2 编写程序

四、spark-submit 详细参数说明

 五、完整代码实现

5.1 WordCount.scala文件在本地运行:

5.2 WordCount.scala文件在yarm上运行:


一、Spark和Hadoop的区别

Hadoop虽然已经成为大数据技术的事实标准,但其本身存在很多缺陷。比如,mapreduce计算模型延迟过高,无法实现实时快速计算的需求,只适用于离线批处理,I/O磁盘开销大。spark在借鉴mapreduce优点同时,很好解决了mapreduce存在的缺陷:

  1. spark计算也属于mapreduce计算,但不局限于map和reduce操作;
  2. spark提供内计算,中间结果放入内存,提高迭代运算效率
  3. 基于DAG的任务调度执行机制,优于mapreduce调度机制。

二、安装IDEA

可以在官网下载安装社区版本:

IntelliJ IDEA – the Leading Java and Kotlin IDE

2.1安装Scala

在File菜单->Settings->Plugins 插件安装界面搜索scala插件安装。

2.2 Scala下载

(我选择的版本是2.12.15)安装及环境变量的配置

官方下载地址:The Scala Programming Language (scala-lang.org)

双击打开下载好的安装程序,一直“Next”即可,最好不要安装到C盘,中间修改一下安装路径即可,最后点击“Finish”。我将scala软件安装在了D盘目录下的Develop文件夹,bin路径如下:

配置scala的系统环境变量,将scala安装的bin目录路径加入到系统环境变量path中:

win+R打开命令窗口输入:scala -verison ,进行检测是否成功配置环境变量

2.3 Scala插件 (版本要与IDEA版本保持一致,下载2019.2.3版本)的下载安装)

Scala - IntelliJ IDEs Plugin | Marketplace

下载完成后,将下载的压缩包解压到IDEA安装目录下的plugins目录下

2.4 检测Scala插件是否在IDEA中已经安装成功

2.5 新建scala类文件编写代码

 

 

2.6 鼠标点击java文件夹,右键new--->Scala Class

 

在WordCount文件中编写如下代码:

  1. import org.apache.spark.sql.SparkSession
  2. object WordCount {
  3. def main(args: Array[String]): Unit = {
  4. val spark = SparkSession
  5. .builder()
  6. .master("local[*]")
  7. .appName("word count")
  8. .getOrCreate()
  9. val sc = spark.sparkContext
  10. val rdd = sc.textFile("data/input/words.txt")
  11. val counts = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
  12. counts.collect().foreach(println)
  13. println("全部的单词数:"+counts.count())
  14. counts.saveAsTextFile("data/output/word-count")
  15. }
  16. }

2.7 准备好测试文件words.txt,将文件存放在scalaproject-->data-->input-->words.txt

 

运行WordCount程序:

运行结果:

三、编写本地运行的Spark程序

3.1 编写pom.xml文件

管理spark程序依赖jar,此时要能上网,在pom.xml文件中,添加如下配置信息

  1. <repositories>
  2. <repository>
  3. <id>aliyun</id>
  4. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  5. </repository>
  6. <repository>
  7. <id>apache</id>
  8. <url>https://repository.apache.org/content/repositories/snapshots/</url>
  9. </repository>
  10. <repository>
  11. <id>cloudera</id>
  12. <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
  13. </repository>
  14. </repositories>
  15. <properties>
  16. <encoding>UTF-8</encoding>
  17. <maven.compiler.source>1.8</maven.compiler.source>
  18. <maven.compiler.target>1.8</maven.compiler.target>
  19. <scala.version>2.12.10</scala.version>
  20. <spark.version>3.0.1</spark.version>
  21. <hadoop.version>2.7.7</hadoop.version>
  22. </properties>
  23. <dependencies>
  24. <!--依赖Scala语言-->
  25. <dependency>
  26. <groupId>org.scala-lang</groupId>
  27. <artifactId>scala-library</artifactId>
  28. <version>${scala.version}</version>
  29. </dependency>
  30. <!--SparkCore依赖-->
  31. <dependency>
  32. <groupId>org.apache.spark</groupId>
  33. <artifactId>spark-core_2.12</artifactId>
  34. <version>${spark.version}</version>
  35. </dependency>
  36. <!--SparkSQL依赖-->
  37. <dependency>
  38. <groupId>org.apache.spark</groupId>
  39. <artifactId>spark-sql_2.12</artifactId>
  40. <version>${spark.version}</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.hadoop</groupId>
  44. <artifactId>hadoop-client</artifactId>
  45. <version>2.7.5</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>com.hankcs</groupId>
  49. <artifactId>hanlp</artifactId>
  50. <version>portable-1.7.7</version>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.projectlombok</groupId>
  54. <artifactId>lombok</artifactId>
  55. <version>1.18.2</version>
  56. <scope>provided</scope>
  57. </dependency>
  58. </dependencies>
  59. <build>
  60. <sourceDirectory>src/main/scala</sourceDirectory>
  61. <plugins>
  62. <!-- 指定编译java的插件 -->
  63. <plugin>
  64. <groupId>org.apache.maven.plugins</groupId>
  65. <artifactId>maven-compiler-plugin</artifactId>
  66. <version>3.5.1</version>
  67. </plugin>
  68. <!-- 指定编译scala的插件 -->
  69. <plugin>
  70. <groupId>net.alchim31.maven</groupId>
  71. <artifactId>scala-maven-plugin</artifactId>
  72. <version>3.2.2</version>
  73. <executions>
  74. <execution>
  75. <goals>
  76. <goal>compile</goal>
  77. <goal>testCompile</goal>
  78. </goals>
  79. <configuration>
  80. <args>
  81. <arg>-dependencyfile</arg>
  82. <arg>${project.build.directory}/.scala_dependencies</arg>
  83. </args>
  84. </configuration>
  85. </execution>
  86. </executions>
  87. </plugin>
  88. <plugin>
  89. <groupId>org.apache.maven.plugins</groupId>
  90. <artifactId>maven-surefire-plugin</artifactId>
  91. <version>2.18.1</version>
  92. <configuration>
  93. <useFile>false</useFile>
  94. <disableXmlReport>true</disableXmlReport>
  95. <includes>
  96. <include>**/*Test.*</include>
  97. <include>**/*Suite.*</include>
  98. </includes>
  99. </configuration>
  100. </plugin>
  101. <plugin>
  102. <groupId>org.apache.maven.plugins</groupId>
  103. <artifactId>maven-shade-plugin</artifactId>
  104. <version>2.3</version>
  105. <executions>
  106. <execution>
  107. <phase>package</phase>
  108. <goals>
  109. <goal>shade</goal>
  110. </goals>
  111. <configuration>
  112. <filters>
  113. <filter>
  114. <artifact>*:*</artifact>
  115. <excludes>
  116. <exclude>META-INF/*.SF</exclude>
  117. <exclude>META-INF/*.DSA</exclude>
  118. <exclude>META-INF/*.RSA</exclude>
  119. </excludes>
  120. </filter>
  121. </filters>
  122. <transformers>
  123. <transformer
  124. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  125. <mainClass></mainClass>
  126. </transformer>
  127. </transformers>
  128. </configuration>
  129. </execution>
  130. </executions>
  131. </plugin>
  132. </plugins>
  133. </build>

刷新maven工程,会自动下载所需依赖jar,此时会下载时间较长,耐心等待

3.2 编写程序

WordCount .scala文件实现单词计数关键代码的解析如下:

  1. //1.env/准备sc/SparkContext/Spark上下文执行环境
  2. val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
  3. val sc: SparkContext = new SparkContext(conf)
  4. sc.setLogLevel("WARN")
  5. //2.source/读取数据
  6. val lines: RDD[String] = sc.textFile("data/input/words.txt")
  7. //(这里要特别注意一下,你自己电脑的目录下要保证有这个words.txt文件)
  8. //3.transformation/数据操作/转换
  9. //切割:RDD[一个个的单词]
  10. val words: RDD[String] = lines.flatMap(_.split(" "))
  11. //记为1:RDD[(单词, 1)]
  12. val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
  13. val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
  14. //4.输出
  15. //直接输出
  16. result.foreach(println)
  17. //输出到指定path(可以是文件/夹)
  18. result.repartition(1).saveAsTextFile("data/output/result")
  19. //为了便于查看Web-UI可以让程序
  20. Thread.sleep(1000 * 60)
  21. //5.关闭资源
  22. sc.stop()

四、spark-submit 详细参数说明

 

 五、完整代码实现

5.1 WordCount.scala文件在本地运行:

  1. package net.objet
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object WordCount {
  5. def main(args: Array[String]): Unit = {
  6. //1.env/准备sc/SparkContext/Spark上下文执行环境
  7. val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
  8. val sc: SparkContext = new SparkContext(conf)
  9. sc.setLogLevel("WARN")
  10. //2.source/读取数据
  11. val lines: RDD[String] = sc.textFile("data/input/words.txt")
  12. //3.transformation/数据操作/转换
  13. //切割:RDD[一个个的单词]
  14. val words: RDD[String] = lines.flatMap(_.split(" "))
  15. //记为1:RDD[(单词, 1)]
  16. val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
  17. val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
  18. //4.输出
  19. //直接输出
  20. result.foreach(println)
  21. //输出到指定path(可以是文件/夹)
  22. result.repartition(1).saveAsTextFile("data/output/result")
  23. //为了便于查看Web-UI可以让程序
  24. Thread.sleep(1000 * 60)
  25. //5.关闭资源
  26. sc.stop()
  27. }
  28. }

5.2 WordCount.scala文件在yarm上运行:

  1. package net
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object WordCount {
  5. def main(args: Array[String]): Unit = {
  6. if(args.length < 2){
  7. println("请指定input和output")
  8. System.exit(1)//非0表示非正常退出程序
  9. }
  10. //1.env/准备sc/SparkContext/Spark上下文执行环境
  11. val conf: SparkConf = new SparkConf().setAppName("wc") //.setMaster("local[*]")
  12. val sc: SparkContext = new SparkContext(conf)
  13. sc.setLogLevel("WARN")
  14. //2.source/读取数据
  15. val lines: RDD[String] = sc.textFile(args(0))
  16. //3.transformation/数据操作/转换
  17. //切割:RDD[一个个的单词]
  18. val words: RDD[String] = lines.flatMap(_.split(" "))
  19. //记为1:RDD[(单词, 1)]
  20. val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
  21. val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
  22. //4.输出
  23. //直接输出
  24. result.foreach(println)
  25. //输出到指定path(可以是文件/夹)
  26. result.repartition(1).saveAsTextFile(args(1))
  27. //为了便于查看Web-UI可以让程序Thread.sleep(1000 * 60)
  28. //5.关闭资源
  29. sc.stop()
  30. }
  31. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/801195
推荐阅读
相关标签
  

闽ICP备14008679号