赞
踩
创建RDD,然后对RDD进行操作(调用RDD的方法,方法分为两类,一类叫Transformation(懒,lazy),一类叫Action(执行程序))
RDD上的方法和Scala原生的方法是有区别的
写好程序,打包上集群运行
本地模式运行spark程序,.setMaster("local[*]")
1.1 配置pom.xml文件
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.caimh.spark</groupId>
- <artifactId>sparktest</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <scala.version>2.11.8</scala.version>
- <spark.version>2.2.0</spark.version>
- <hadoop.version>2.6.5</hadoop.version>
- <encoding>UTF-8</encoding>
- </properties>
-
- <dependencies>
- <!-- 导入scala的依赖 -->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <!-- 导入spark的依赖 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <!-- 指定hadoop-client API的版本 -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <pluginManagement>
- <plugins>
- <!-- 编译scala的插件 -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
- </plugin>
- <!-- 编译java的插件 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- </plugin>
- </plugins>
- </pluginManagement>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>add-source</goal>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <executions>
- <execution>
- <phase>compile</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <!-- 打jar插件 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
1.2 编写spark程序
- package com.caimh.spark
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- /**
- * Created by caimh on 2019/10/28.
- */
- object ScalaWordCount {
-
- def main(args: Array[String]): Unit = {
- //1.创建spark配置,设置应用程序名字
- val conf = new SparkConf().setAppName("ScalaWordCount")
- //2.创建spark执行的入口
- val sc = new SparkContext(conf)
- //3.指定以后从哪里读取数据创建RDD(弹性分布式数据集)
- val lines: RDD[String] = sc.textFile(args(0))
- //切分压平
- val words: RDD[String] = lines.flatMap(_.split(" "))
- //将单词和1组合
- val wordmap: RDD[(String, Int)] = words.map((_, 1))
- //按key进行聚合
- val wordreduced: RDD[(String, Int)] = wordmap.reduceByKey(_ + _)
- //排序
- val wordsorted: RDD[(String, Int)] = wordreduced.sortBy(_._2, false)
- //将结果保存到HDFS中
- wordsorted.saveAsTextFile(args(1))
- //释放资源
- sc.stop()
- }
-
- }
1.3 使用Maven打包
1.4 选择编译成功的jar包,并将该jar上传到Spark集群中的某个节点上
1.5 首先启动集群
如果是Spark配置的是Standalone集群,需要启动spark集群,hdfs集群
如果配置的是Yarn集群,启动hdfs/yarn集群。
我是配置的yarn集群
- [caimh@master-node spark-2.1.1]$ jps
- 46368 Jps
- 44469 ResourceManager
- 45048 QuorumPeerMain
- 44056 NameNode
- 44585 NodeManager
- 44170 DataNode
1.6 使用spark-submit命令提交Spark应用(注意参数的顺序)
- [caimh@master-node spark-2.1.1]$ bin/spark-submit \
- > --master yarn-cluster \
- > --class com.caimh.spark.ScalaWordCount \
- > /opt/module/spark-2.1.1/sparktest-1.0-SNAPSHOT.jar hdfs://master-node:9000/wordcount.txt hdfs://master-node:9000/wordcount
1.7 查看程序执行结果
yarn查看
hdfs查看
1.2 编写spark程序
- package com.caimh.spark;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import scala.Tuple2;
-
- import java.util.Arrays;
- import java.util.Iterator;
-
- /**
- * Created by caimh on 2019/10/29.
- */
- public class JavaWordCount {
-
- public static void main(String[] args) {
-
- //创建配置文件
- SparkConf conf = new SparkConf().setAppName("JavaWordCount");
- //创建SparkContext
- JavaSparkContext jsc = new JavaSparkContext(conf);
- //指定以后从哪里读取数据
- JavaRDD<String> lines = jsc.textFile(args[0]);
- //切分压平(函数式编程)
- JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String line) throws Exception {
- return Arrays.asList(line.split(" ")).iterator();
- }
- });
- //将单词和1组合在一起
- JavaPairRDD<String, Integer> wordTuple = words.mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String word) throws Exception {
-
- return new Tuple2<>(word, 1);
- }
- });
- //根据key聚合
- JavaPairRDD<String, Integer> wordReduced = wordTuple.reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 + v2;
- }
- });
- //排序(方法sordBykey是根据key排序,需要把wordReduced的k,v顺序调换)
- //调换顺序
- JavaPairRDD<Integer, String> wordSwaped = wordReduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
- @Override
- public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
- return tp.swap();
- }
- });
- //排序
- wordSwaped.sortByKey(false);
- //调换顺序
- JavaPairRDD<String, Integer> result = wordSwaped.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
- return tp.swap();
- }
- });
- //将数据保存到HDFS
- result.saveAsTextFile(args[1]);
- //释放资源
- jsc.close();
- }
- }
1.3 编译打包
1.4 执行程序
- [caimh@master-node spark-2.1.1]$ bin/spark-submit
- --master yarn-cluster
- --class com.caimh.spark.JavaWordCount
- /opt/module/spark-2.1.1/sparktest-1.0-SNAPSHOT.jar hdfs://master-node:9000/wordcount.txt hdfs://master-node:9000/wordcount
1.4 检查结果
yarn
hdfs
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。