赞
踩
我们全都要从前辈和同辈学习到一些东西。就连最大的天才,如果想单凭他所特有的内在自我去对付一切,他也决不会有多大成就。
Spark-Shell通常在测试和验证我们的程序时使用的较多,然而在生产环境中,通常会在IDEA开发工具中编写程序,然后打成Jar包,最后提交到集群中执行。本节我们将利用IDEA工具开发一个WordCount单词计数程序。
Spark作业与MapReduce作业同样可以先在本地开发测试,本地执行模式与集群提交模式,代码的业务功能相同,因此本书大多数采用本地开发模式。下面讲解使用IDEA工具开发WordCount单词计数程序的相关步骤。
如下图所示:
Maven是一个项目管理工具,虽然我们刚才创建好了项目,但是却不能识别Spark类,因此,我们需要将Spark相关的依赖添加到Maven项目中。打开pom.xml文件,在该文件中添加相关依赖如下所示:
<!--设置依赖版本号--> <properties> <scala.version>2.11.8</scala.version> <hadoop.version>2.7.4</hadoop.version> <spark.version>2.3.2</spark.version> </properties> <dependencies> <!--Scala--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!--Spark--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--Hadoop--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies>
package cn.itcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { //1:创建SparkConf对象,设置appName和Master地址 val sparkConf = new SparkConf().setAppName("Wordcount").setMaster("local[2]") //2:创建SparkContext对象,他是所有任务计算的源头 val sparkContext = new SparkContext(sparkConf) //3:读取数据文件,RDD简单理解为一个集合 val data: RDD[String] = sparkContext.textFile("c:\\word\\words.txt") //4:切分乱每一行,获取所有的单词,单词之间以空格隔开 val words: RDD[String] = data.flatMap(_.split(" ")) //5:每个单词标记为1,转换为(单词,1) val wordAndOne: RDD[(String, Int)] = words.map(x => (x, 1)) //6:相同的单词汇总,前一个下划线表示累加数据,后一个下划线表示新数据 (x,y)=>x+y val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _) //7:收集打印结果数据 val finalResult: Array[(String, Int)] = result.collect() println(finalResult.toBuffer) //8:关闭sparkContext对象 sparkContext.stop() } }
文件 c:\word\words.txt内容如下:
hello hadoop
hello spark
hello itcast
运行结果如下所示:
集群模式是指将Spark程序提交至Spark集群中执行任务,由Spark集群负责资源的调度,程序会被框架分发到集群中的节点上并发地执行。下面分步骤介绍如何在集群模式下执行Spark程序。
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
注意:如果在创建Maven工程中选择Scala原型模板,上述插件会自动创建。这些插件的主要功能是方便开发人员进行打包。
package cn.itcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} //编写单词计数程序,打成Jar包,提交到集群中运行 object WordCount_Online { def main(args: Array[String]): Unit = { //1:创建SparkConf对象,设置appName和Master地址 val sparkConf = new SparkConf().setAppName("Wordcount_Online") //2:创建SparkContext对象,他是所有任务计算的源头 val sparkContext = new SparkContext(sparkConf) //3:读取数据文件(path是HDFS上的路径,以传参的方式),RDD简单理解为一个集合 val data: RDD[String] = sparkContext.textFile(args(0)) //4:切分乱每一行,获取所有的单词,单词之间以空格隔开 val words: RDD[String] = data.flatMap(_.split(" ")) //5:每个单词标记为1,转换为(单词,1) val wordAndOne: RDD[(String, Int)] = words.map(x => (x, 1)) //6:相同的单词汇总,前一个下划线表示累加数据,后一个下划线表示新数据 (x,y)=>x+y val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _) //7:将结果数据保存到HDFS上 result.saveAsTextFile(args(1)) //8:关闭sparkContext对象 sparkContext.stop() } }
spark-submit
”spark-submit
" 命令提交任务,命令如下。bin/spark-submit --master spark://hadoop01:7077 \
--class cn.itcast.WordCount_Online \
--executor-memory 1g \
--total-executor-cores 1 \
/export/data/spark_chapter02-1.0-SNAPSHOT.jar \
/spark/test/words.txt \
/spark/test/out
结果如下图所示:
转载自:https://blog.csdn.net/u014727709/article/details/132509490
欢迎
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。