当前位置:   article > 正文

IDEA编写Spark案例WordCount(Scala和Java编程)_idea 2023开发spark demo

idea 2023开发spark demo

用idea编写Spark程序

创建RDD,然后对RDD进行操作(调用RDD的方法,方法分为两类,一类叫Transformation(懒,lazy),一类叫Action(执行程序))

RDD上的方法和Scala原生的方法是有区别的

写好程序,打包上集群运行

本地模式运行spark程序,.setMaster("local[*]")

1.Scala编写

1.1 配置pom.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.caimh.spark</groupId>
  7. <artifactId>sparktest</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>1.8</maven.compiler.source>
  11. <maven.compiler.target>1.8</maven.compiler.target>
  12. <scala.version>2.11.8</scala.version>
  13. <spark.version>2.2.0</spark.version>
  14. <hadoop.version>2.6.5</hadoop.version>
  15. <encoding>UTF-8</encoding>
  16. </properties>
  17. <dependencies>
  18. <!-- 导入scala的依赖 -->
  19. <dependency>
  20. <groupId>org.scala-lang</groupId>
  21. <artifactId>scala-library</artifactId>
  22. <version>${scala.version}</version>
  23. </dependency>
  24. <!-- 导入spark的依赖 -->
  25. <dependency>
  26. <groupId>org.apache.spark</groupId>
  27. <artifactId>spark-core_2.11</artifactId>
  28. <version>${spark.version}</version>
  29. </dependency>
  30. <!-- 指定hadoop-client API的版本 -->
  31. <dependency>
  32. <groupId>org.apache.hadoop</groupId>
  33. <artifactId>hadoop-client</artifactId>
  34. <version>${hadoop.version}</version>
  35. </dependency>
  36. </dependencies>
  37. <build>
  38. <pluginManagement>
  39. <plugins>
  40. <!-- 编译scala的插件 -->
  41. <plugin>
  42. <groupId>net.alchim31.maven</groupId>
  43. <artifactId>scala-maven-plugin</artifactId>
  44. <version>3.2.2</version>
  45. </plugin>
  46. <!-- 编译java的插件 -->
  47. <plugin>
  48. <groupId>org.apache.maven.plugins</groupId>
  49. <artifactId>maven-compiler-plugin</artifactId>
  50. <version>3.5.1</version>
  51. </plugin>
  52. </plugins>
  53. </pluginManagement>
  54. <plugins>
  55. <plugin>
  56. <groupId>net.alchim31.maven</groupId>
  57. <artifactId>scala-maven-plugin</artifactId>
  58. <executions>
  59. <execution>
  60. <id>scala-compile-first</id>
  61. <phase>process-resources</phase>
  62. <goals>
  63. <goal>add-source</goal>
  64. <goal>compile</goal>
  65. </goals>
  66. </execution>
  67. <execution>
  68. <id>scala-test-compile</id>
  69. <phase>process-test-resources</phase>
  70. <goals>
  71. <goal>testCompile</goal>
  72. </goals>
  73. </execution>
  74. </executions>
  75. </plugin>
  76. <plugin>
  77. <groupId>org.apache.maven.plugins</groupId>
  78. <artifactId>maven-compiler-plugin</artifactId>
  79. <executions>
  80. <execution>
  81. <phase>compile</phase>
  82. <goals>
  83. <goal>compile</goal>
  84. </goals>
  85. </execution>
  86. </executions>
  87. </plugin>
  88. <!-- 打jar插件 -->
  89. <plugin>
  90. <groupId>org.apache.maven.plugins</groupId>
  91. <artifactId>maven-shade-plugin</artifactId>
  92. <version>2.4.3</version>
  93. <executions>
  94. <execution>
  95. <phase>package</phase>
  96. <goals>
  97. <goal>shade</goal>
  98. </goals>
  99. <configuration>
  100. <filters>
  101. <filter>
  102. <artifact>*:*</artifact>
  103. <excludes>
  104. <exclude>META-INF/*.SF</exclude>
  105. <exclude>META-INF/*.DSA</exclude>
  106. <exclude>META-INF/*.RSA</exclude>
  107. </excludes>
  108. </filter>
  109. </filters>
  110. </configuration>
  111. </execution>
  112. </executions>
  113. </plugin>
  114. </plugins>
  115. </build>
  116. </project>

1.2 编写spark程序

  1. package com.caimh.spark
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. /**
  5. * Created by caimh on 2019/10/28.
  6. */
  7. object ScalaWordCount {
  8. def main(args: Array[String]): Unit = {
  9. //1.创建spark配置,设置应用程序名字
  10. val conf = new SparkConf().setAppName("ScalaWordCount")
  11. //2.创建spark执行的入口
  12. val sc = new SparkContext(conf)
  13. //3.指定以后从哪里读取数据创建RDD(弹性分布式数据集)
  14. val lines: RDD[String] = sc.textFile(args(0))
  15. //切分压平
  16. val words: RDD[String] = lines.flatMap(_.split(" "))
  17. //将单词和1组合
  18. val wordmap: RDD[(String, Int)] = words.map((_, 1))
  19. //按key进行聚合
  20. val wordreduced: RDD[(String, Int)] = wordmap.reduceByKey(_ + _)
  21. //排序
  22. val wordsorted: RDD[(String, Int)] = wordreduced.sortBy(_._2, false)
  23. //将结果保存到HDFS中
  24. wordsorted.saveAsTextFile(args(1))
  25. //释放资源
  26. sc.stop()
  27. }
  28. }

1.3 使用Maven打包

1.4 选择编译成功的jar包,并将该jar上传到Spark集群中的某个节点上

1.5 首先启动集群

如果是Spark配置的是Standalone集群,需要启动spark集群,hdfs集群

如果配置的是Yarn集群,启动hdfs/yarn集群。

我是配置的yarn集群

  1. [caimh@master-node spark-2.1.1]$ jps
  2. 46368 Jps
  3. 44469 ResourceManager
  4. 45048 QuorumPeerMain
  5. 44056 NameNode
  6. 44585 NodeManager
  7. 44170 DataNode

1.6 使用spark-submit命令提交Spark应用(注意参数的顺序)

  1. [caimh@master-node spark-2.1.1]$ bin/spark-submit \
  2. > --master yarn-cluster \
  3. > --class com.caimh.spark.ScalaWordCount \
  4. > /opt/module/spark-2.1.1/sparktest-1.0-SNAPSHOT.jar hdfs://master-node:9000/wordcount.txt hdfs://master-node:9000/wordcount

1.7 查看程序执行结果

yarn查看

hdfs查看

2.Java编写

 

1.2 编写spark程序

  1. package com.caimh.spark;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.FlatMapFunction;
  7. import org.apache.spark.api.java.function.Function2;
  8. import org.apache.spark.api.java.function.PairFunction;
  9. import scala.Tuple2;
  10. import java.util.Arrays;
  11. import java.util.Iterator;
  12. /**
  13. * Created by caimh on 2019/10/29.
  14. */
  15. public class JavaWordCount {
  16. public static void main(String[] args) {
  17. //创建配置文件
  18. SparkConf conf = new SparkConf().setAppName("JavaWordCount");
  19. //创建SparkContext
  20. JavaSparkContext jsc = new JavaSparkContext(conf);
  21. //指定以后从哪里读取数据
  22. JavaRDD<String> lines = jsc.textFile(args[0]);
  23. //切分压平(函数式编程)
  24. JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  25. @Override
  26. public Iterator<String> call(String line) throws Exception {
  27. return Arrays.asList(line.split(" ")).iterator();
  28. }
  29. });
  30. //将单词和1组合在一起
  31. JavaPairRDD<String, Integer> wordTuple = words.mapToPair(new PairFunction<String, String, Integer>() {
  32. @Override
  33. public Tuple2<String, Integer> call(String word) throws Exception {
  34. return new Tuple2<>(word, 1);
  35. }
  36. });
  37. //根据key聚合
  38. JavaPairRDD<String, Integer> wordReduced = wordTuple.reduceByKey(new Function2<Integer, Integer, Integer>() {
  39. @Override
  40. public Integer call(Integer v1, Integer v2) throws Exception {
  41. return v1 + v2;
  42. }
  43. });
  44. //排序(方法sordBykey是根据key排序,需要把wordReduced的k,v顺序调换)
  45. //调换顺序
  46. JavaPairRDD<Integer, String> wordSwaped = wordReduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
  47. @Override
  48. public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
  49. return tp.swap();
  50. }
  51. });
  52. //排序
  53. wordSwaped.sortByKey(false);
  54. //调换顺序
  55. JavaPairRDD<String, Integer> result = wordSwaped.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
  56. @Override
  57. public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
  58. return tp.swap();
  59. }
  60. });
  61. //将数据保存到HDFS
  62. result.saveAsTextFile(args[1]);
  63. //释放资源
  64. jsc.close();
  65. }
  66. }

1.3 编译打包

1.4 执行程序

  1. [caimh@master-node spark-2.1.1]$ bin/spark-submit
  2. --master yarn-cluster
  3. --class com.caimh.spark.JavaWordCount
  4. /opt/module/spark-2.1.1/sparktest-1.0-SNAPSHOT.jar hdfs://master-node:9000/wordcount.txt hdfs://master-node:9000/wordcount

1.4 检查结果

yarn

hdfs

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/501068
推荐阅读
相关标签
  

闽ICP备14008679号