赞
踩
目录
3.12 使用combineByKey()方法合并相同键的值
2.3 Scala插件(版本要与IDEA版本保持一致,下载2019.2.3版本)的下载安装
2.6 鼠标点击java文件夹,右键new--->Scala Class
2.7 准备好测试文件words.txt,将文件存放在scalaproject-->data-->input-->words.txt
5.2 WordCount.scala文件在yarm上运行:
选择“Libraries”选项,单击“+”按钮,选择“Java”选项
在弹出的界面中找到Spark安装目录下的jars文件夹,将整个文件夹导入,如图所示
点击“OK”
任何Spark程序都是以SparkContext对象开始的,因为SparkContext是Spark应用程序的上下文和入口,无论是Scala、Python、R程序,都是通过SparkContext对象的实例来创建RDD,Spark Shell中的sc就是SparkContext对象的实例。因此在实际Spark应用程序的开发中,在main方法中需要创建SparkContext对象,作为Spark应用程序的入口,并在Spark程序结束时关闭SparkContext对象。
初始化SparkContext需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数,属性参数是一种键值对的格式,一般可以通过set(属性名,属性设置值)的方法修改属性。其中还包含了设置程序名setAppName、设置运行模式setMaster等方法。如下图所示
SparkContext对象的实例创建完成后,就可以通过实例变量转化集合或者读取数据,计算过程中转化操作和行动操作的使用方法与在Shell环境中一致。
RDD是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合。RDD的创建有3种不同的方法。
第一种是将程序中已存在的Seq集合(如集合、列表、数组)转换成RDD。
第二种是对已有RDD进行转换得到新的RDD,这两种方法都是通过内存中已有的集合创建RDD的。
第三种是直接读取外部存储系统的数据创建RDD。
parallelize()方法有两个输入参数,说明如下。
要转化的集合,必须是Seq集合。Seq表示序列,指的是一类具有一定长度的、可迭代访问的对象,其中每个数据元素均带有一个从0开始的、固定的索引。
分区数。若不设分区数,则RDD的分区数默认为该程序分配到的资源的CPU核心数。
makeRDD()方法有两种使用方式:
第二种方式生成的RDD中保存的是T的值,Seq[String]部分的数据会按照Seq[(T,Seq[String])]的顺序存放到各个分区中,一个Seq[String]对应存放至一个分区,并为数据提供位置信息,通过preferredLocations()方法可以根据位置信息查看每一个分区的值。调用makeRDD()时不可以直接指定RDD的分区个数,分区的个数与Seq[String]参数的个数是保持一致的。
从外部存储系统中读取数据创建RDD是指直接读取存放在文件系统中的数据文件创建RDD。
从内存中读取数据创建RDD的方法常用于测试,从外部存储系统中读取数据创建RDD才是用于实践操作的常用方法。
从外部存储系统中读取数据创建RDD可以有很多种数据来源,可通过SparkContext对象的textFile()方法读取数据集,该方法支持多种类型的数据集,如目录、文本文件、压缩文件和通配符匹配的文件等,并且允许设定分区个数。
分别读取HDFS文件和Linux本地文件的数据并创建RDD,具体操作如下。
通过HDFS文件创建RDD:
直接通过textFile()方法读取HDFS文件的位置即可。
通过Linux本地文件创建RDD:
本地文件的读取也是通过sc.textFile("路径")的方法实现的,在路径前面加上“file://”表示从Linux本地文件系统读取。在IntelliJ IDEA开发环境中可以直接读取本地文件;但在spark-shell中,要求在所有节点的相同位置保存该文件才可以读取它。
RDD提供了丰富的操作方法用于操作分布式的数据集合,包括转换操作和行动操作两部分。
转换操作可以将一个RDD转换为一个新的RDD,但是转换操作是懒操作,不会立刻执行计算。
行动操作是用于触发转换操作的操作,这时才会真正开始进行计算。
map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD。
map()方法是转换操作,不会立即进行计算。
转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD。
sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。
第一个参数是必须输入的,而后面的两个参数可以不输入。
collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。
因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。
因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。
collect()方法有以下两种操作方式:
collect[U: ClassTag](f: PartialFunction[T, U]):RDD[U]。这种方式需要提供一个标准的偏函数,将元素保存至一个RDD中。首先定义一个函数one,用于将collect方法得到的数组中数值为1的值替换为“one”,将其他值替换为“other”。
flatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD。
使用flatMap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD。
这个转换操作通常用来切分单词。
take(N)方法用于获取RDD的前N个元素,返回数据为数组。
take()与collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。
获取RDD的前5个元素
union()方法是一种转换操作,用于将两个RDD合并成一个,不进行去重操作,而且两个RDD中每个元素中的值的个数、数据类型需要保持一致。
使用union()方法合并两个RDD
filter()方法是一种转换操作,用于过滤RDD中的元素。
filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。
filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD。
创建一个RDD,并且过滤掉每个元组第二个值小于等于1的元素。
distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。
创建一个带有重复数据的RDD,并使用distinct()方法去重。
Spark中的集合操作常用方法(转换操作)
intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。
创建两个RDD,其中有相同的元素,通过intersection()方法求出两个RDD的交集。
subtract()方法用于将前一个RDD中在后一个RDD出现的元素删除,可以认为是求补集的操作,返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果。
创建两个RDD,分别为rdd1和rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1和rdd2彼此的补集。
cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。
创建两个RDD,分别有4个元素,通过cartesian()方法求两个RDD的笛卡儿积。
Spark的大部分RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD。
顾名思义,键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。
例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD。
有很多种创建键值对RDD的方式,很多存储键值对的数据格式会在读取时直接返回由其键值对组成的PairRDD。
当需要将一个普通的RDD转化为一个PairRDD时可以使用map函数来进行操作,传递的函数需要返回键值对。
键值对RDD,包含键和值两个部分。
Spark提供了两种方法,分别获取键值对RDD的键和值。
当数据集以键值对形式展现时,合并统计键相同的值是很常用的操作。
reduceByKey()方法用于合并具有相同键的值,作用对象是键值对,并且只对每个键的值进行处理,当RDD中有多个键相同的键值对时,则会对每个键对应的值进行处理。
reduceByKey()方法需要接收一个输入函数,键值对RDD相同键的值会根据函数进行合并并且创建一个新的RDD作为返回结果。
在进行处理时,reduceByKey()方法将相同键的前两个值传给输入函数,产生一个新的返回值,新产生的返回值与RDD中相同键的下一个值组成两个元素,再传给输入函数,直到最后每个键只有一个对应的值为止。reduceByKey()方法不是一种行动操作,而是一种转换操作。
groupByKey()方法用于对具有相同键的值进行分组,可以对同一组的数据进行计数、求和等操作。
对于一个由类型K的键和类型V的值组成的RDD,通过groupByKey()方法得到的RDD类型是[K,Iterable[V]]。
leftOuterJoin()方法用于根据键对两个RDD进行左外连接,与rightOuterJoin()方法相反,返回结果保留左边RDD的所有键。
fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。
lookup(key:K)方法作用于键值对RDD,返回指定键的所有值。
Spark支持的一些常见文件格式
通过textFile()方法即可直接读取,一条记录(一行)作为一个元素。
RDD数据可以直接调用saveAsTextFile()方法将数据存储为文本文件。
RDD分布式对象集合,本质上是一个只读的分区记录集合,不能直接修改,通过转换得到新的RDD。
在RDD的执行过程中,真正的计算发生在行动操作中,在前面的所有转换,spark只是记录下转换操作应用的一些基础数据集和RDD生成轨迹,不会触发计算。
优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单、高效的容错性、存放的数据可以是JAVA对象
上述对RDD概念、依赖关系和Stage划分的介绍,结合之前介绍的Spark运行基本流程,再总结一下RDD在Spark架构中的运行过程:
Hadoop虽然已经成为大数据技术的事实标准,但其本身存在很多缺陷。比如,mapreduce计算模型延迟过高,无法实现实时快速计算的需求,只适用于离线批处理,I/O磁盘开销大。
spark在借鉴mapreduce优点同时,很好解决了mapreduce存在的缺陷:
可以在官网下载安装社区版本:
IntelliJ IDEA – the Leading Java and Kotlin IDE (jetbrains.com)
在File菜单->Settings->Plugins 插件安装界面搜索scala插件安装。
(我选择的版本是2.12.15)安装及环境变量的配置
官方下载地址:The Scala Programming Language (scala-lang.org)
双击打开下载好的安装程序,一直“Next”即可,最好不要安装到C盘,中间修改一下安装路径即可,最后点击“Finish”。我将scala软件安装在了D盘目录下的Develop文件夹,bin路径如下:
配置scala的系统环境变量,将scala安装的bin目录路径加入到系统环境变量path中:
win+R打开命令窗口输入:scala -verison ,进行检测是否成功配置环境变量
官方下载: Versions: Scala Plugin for IntelliJ IDEA & Android Studio | JetBrains Marketplace
下载完成后,将下载的压缩包解压到IDEA安装目录下的plugins目录下
在WordCount文件中编写如下代码:
- import org.apache.spark.sql.SparkSession
- object WordCount {
- def main(args: Array[String]): Unit = {
- val spark = SparkSession
- .builder()
- .master("local[*]")
- .appName("word count")
- .getOrCreate()
- val sc = spark.sparkContext
- val rdd = sc.textFile("data/input/words.txt")
- val counts = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
- counts.collect().foreach(println)
- println("全部的单词数:"+counts.count())
- counts.saveAsTextFile("data/output/word-count")
- }
- }
运行WordCount程序:
运行结果:
管理spark程序依赖jar,此时要能上网,在pom.xml文件中,添加如下配置信息
- <repositories>
- <repository>
- <id>aliyun</id>
- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
- </repository>
- <repository>
- <id>apache</id>
- <url>https://repository.apache.org/content/repositories/snapshots/</url>
- </repository>
- <repository>
- <id>cloudera</id>
- <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
- </repository>
- </repositories>
- <properties>
- <encoding>UTF-8</encoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <scala.version>2.12.10</scala.version>
- <spark.version>3.0.1</spark.version>
- <hadoop.version>2.7.7</hadoop.version>
- </properties>
- <dependencies>
- <!--依赖Scala语言-->
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
-
- <!--SparkCore依赖-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.12</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <!--SparkSQL依赖-->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.12</artifactId>
- <version>${spark.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.7.5</version>
- </dependency>
-
- <dependency>
- <groupId>com.hankcs</groupId>
- <artifactId>hanlp</artifactId>
- <version>portable-1.7.7</version>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.2</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/scala</sourceDirectory>
- <plugins>
- <!-- 指定编译java的插件 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- </plugin>
- <!-- 指定编译scala的插件 -->
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.2.2</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- <configuration>
- <args>
- <arg>-dependencyfile</arg>
- <arg>${project.build.directory}/.scala_dependencies</arg>
- </args>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.18.1</version>
- <configuration>
- <useFile>false</useFile>
- <disableXmlReport>true</disableXmlReport>
- <includes>
- <include>**/*Test.*</include>
- <include>**/*Suite.*</include>
- </includes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass></mainClass>
- </transformer>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
刷新maven工程,会自动下载所需依赖jar,此时会下载时间较长,耐心等待
WordCount .scala文件实现单词计数关键代码的解析如下:
-
-
- //1.env/准备sc/SparkContext/Spark上下文执行环境
- val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
- val sc: SparkContext = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
-
- //2.source/读取数据
- val lines: RDD[String] = sc.textFile("data/input/words.txt")
- //(这里要特别注意一下,你自己电脑的目录下要保证有这个words.txt文件)
-
-
- //3.transformation/数据操作/转换
- //切割:RDD[一个个的单词]
- val words: RDD[String] = lines.flatMap(_.split(" "))
- //记为1:RDD[(单词, 1)]
- val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
- val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
-
-
- //4.输出
- //直接输出
- result.foreach(println)
-
- //输出到指定path(可以是文件/夹)
- result.repartition(1).saveAsTextFile("data/output/result")
-
- //为了便于查看Web-UI可以让程序
- Thread.sleep(1000 * 60)
-
- //5.关闭资源
- sc.stop()
参数名 | 参数说明 |
--master | master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local |
--deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client |
--class | 应用程序的主类,仅针对 java 或 scala 应用 |
--name | 应用程序的名称 |
--jars | 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 |
--packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 |
--exclude-packages | 为了避免冲突 而指定不包含的 package |
--repositories | 远程 repository |
--conf PROP=VALUE | 指定 spark 配置属性的值, 例如-conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m" |
--properties-file | 加载的配置文件,默认为 conf/spark-defaults.conf |
--driver-memory | Driver内存,默认 1G |
--driver-java-options | 传给 driver 的额外的 Java 选项 |
--driver-library-path | 传给 driver 的额外的库路径 |
--driver-class-path | 传给 driver 的额外的类路径 |
--driver-cores | Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 |
--executor-memory | 每个 executor 的内存,默认是1G |
--total-executor-cores | 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用 |
--num-executors | 启动的 executor 数量。默认为2。在 yarn 下使用 |
--executor-core | 每个 executor 的核数。在yarn或者standalone下使用 |
- package net.objet
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object WordCount {
- def main(args: Array[String]): Unit = {
- //1.env/准备sc/SparkContext/Spark上下文执行环境
- val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
- val sc: SparkContext = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //2.source/读取数据
- val lines: RDD[String] = sc.textFile("data/input/words.txt")
-
- //3.transformation/数据操作/转换
- //切割:RDD[一个个的单词]
- val words: RDD[String] = lines.flatMap(_.split(" "))
- //记为1:RDD[(单词, 1)]
- val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
- val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
- //4.输出
- //直接输出
- result.foreach(println)
- //输出到指定path(可以是文件/夹)
- result.repartition(1).saveAsTextFile("data/output/result")
-
- //为了便于查看Web-UI可以让程序
- Thread.sleep(1000 * 60)
- //5.关闭资源
- sc.stop()
- }
-
- }
- package net
-
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object WordCount {
- def main(args: Array[String]): Unit = {
-
- if(args.length < 2){
- println("请指定input和output")
- System.exit(1)//非0表示非正常退出程序
- }
-
- //1.env/准备sc/SparkContext/Spark上下文执行环境
- val conf: SparkConf = new SparkConf().setAppName("wc") //.setMaster("local[*]")
- val sc: SparkContext = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //2.source/读取数据
- val lines: RDD[String] = sc.textFile(args(0))
-
- //3.transformation/数据操作/转换
- //切割:RDD[一个个的单词]
- val words: RDD[String] = lines.flatMap(_.split(" "))
- //记为1:RDD[(单词, 1)]
- val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
- val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
- //4.输出
- //直接输出
- result.foreach(println)
- //输出到指定path(可以是文件/夹)
- result.repartition(1).saveAsTextFile(args(1))
-
- //为了便于查看Web-UI可以让程序Thread.sleep(1000 * 60)
- //5.关闭资源
- sc.stop()
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。