当前位置:   article > 正文

Spark--Spark编程基础和编程进阶知识总结(第三章和第四章)

Spark--Spark编程基础和编程进阶知识总结(第三章和第四章)

目录

Spark编程基础

一、配置Spark运行环境

1.1 Spark安装步骤

1.2 编写Spark程序

1.3 从内存中读取数据创建RDD

1.3.1 parallelize()

1.3.2 makeRDD()

1.4 从外部存储系统中读取数据创建RDD

二、RDD方法

2.1 使用map()方法转换数据

2.2 使用sortBy()方法进行排序

2.3 使用collect()方法查询数据

2.4 使用collect()方法查询数据

2.5 使用flatMap()方法转换数据

2.6 使用take()方法查询某几个值

2.7 使用union()方法合并多个RDD

2.8 使用filter()方法进行过滤

2.9 使用distinct()方法进行去重

三、使用简单的集合操作

3.1 intersection()方法

3.2 subtract()方法

3.3 cartesian()方法

3.4 了解键值对RDD

3.5 创建键值对RDD

3.6 使用键值对RDD的keys和values方法

3.7 使用键值对RDD的reduceByKey()方法

3.8 使用键值对RDD的reduceByKey()方法

3.9 使用键值对RDD的groupByKey()方法

3.10 使用join()方法连接两个RDD

3.10.1 join()方法

3.10.2 rightOuterJoin()方法

3.10.3 leftOuterJoin()方法

3.10.4 fullOuterJoin()方法

3.11 使用zip()方法组合两个RDD

3.12 使用combineByKey()方法合并相同键的值

3.13 使用lookup()方法查找指定键的值

四、读取与存储JSON文件

4.1 JSON文件的读取

4.2 JSON文件的存储

五、读取与存储CSV文件

5.1 CSV文件的读取

5.2 CSV文件的存储

六、读取与存储SequenceFile文件

6.1 SequenceFile文件的存储

6.2 SequenceFile文件的读取

七、读取与存储文本文件

7.1 文本文件的读取

7.2 文本文件的存储

扩展:RDD

一.核心-RDD

1.1 设计背景

1.2 RDD概念

1.3 RDD特性

1.4 RDD之间的依赖关系

1.5 RDD运行过程

Spark编程进阶

一、Hadoop与spark区别

二、安装IDEA

2.1 安装Scala

2.2 Scala下载

2.3 Scala插件(版本要与IDEA版本保持一致,下载2019.2.3版本)的下载安装

2.4 检测Scala插件是否在IDEA中已经安装成功

2.5 新建scala类文件编写代码

2.6 鼠标点击java文件夹,右键new--->Scala Class

2.7 准备好测试文件words.txt,将文件存放在scalaproject-->data-->input-->words.txt

三、编写本地运行的spark程序

3.1 编写pom.xml 文件

3.2 编写代码

四、spark-submit 详细参数说明

 五、完整代码实现

5.1 WordCount.scala文件在本地运行:

5.2 WordCount.scala文件在yarm上运行:


Spark编程基础

一、配置Spark运行环境

1.1 Spark安装步骤

  • 选择“Libraries”选项,单击“+”按钮,选择“Java”选项

  • 在弹出的界面中找到Spark安装目录下的jars文件夹,将整个文件夹导入,如图所示

  • 点击“OK

1.2 编写Spark程序

任何Spark程序都是以SparkContext对象开始的,因为SparkContextSpark应用程序的上下文和入口,无论是ScalaPythonR程序,都是通过SparkContext对象的实例来创建RDDSpark Shell中的sc就是SparkContext对象的实例。因此在实际Spark应用程序的开发中,在main方法中需要创建SparkContext对象,作为Spark应用程序的入口,并在Spark程序结束时关闭SparkContext对象。

  • 初始化SparkContext需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数,属性参数是一种键值对的格式,一般可以通过set(属性名,属性设置值)的方法修改属性。其中还包含了设置程序名setAppName、设置运行模式setMaster等方法。如下图所示

  • SparkContext对象的实例创建完成后,就可以通过实例变量转化集合或者读取数据,计算过程中转化操作和行动操作的使用方法与在Shell环境中一致。

1.3 从内存中读取数据创建RDD

RDD是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合。RDD的创建有3种不同的方法。

  • 第一种是将程序中已存在的Seq集合(如集合、列表、数组)转换成RDD

  • 第二种是对已有RDD进行转换得到新的RDD,这两种方法都是通过内存中已有的集合创建RDD的。

  • 第三种是直接读取外部存储系统的数据创建RDD

1.3.1 parallelize()

parallelize()方法有两个输入参数,说明如下。

  • 要转化的集合,必须是Seq集合。Seq表示序列,指的是一类具有一定长度的、可迭代访问的对象,其中每个数据元素均带有一个从0开始的、固定的索引。

  • 分区数。若不设分区数,则RDD的分区数默认为该程序分配到的资源的CPU核心数。

1.3.2 makeRDD()

  • makeRDD()方法有两种使用方式:

  1. 第一种方式的使用与parallelize()方法一致;
  2. 第二种方式是通过接收一个是Seq[(T,Seq[String])]参数类型创建RDD
  • 第二种方式生成的RDD中保存的是T的值,Seq[String]部分的数据会按照Seq[(T,Seq[String])]的顺序存放到各个分区中,一个Seq[String]对应存放至一个分区,并为数据提供位置信息,通过preferredLocations()方法可以根据位置信息查看每一个分区的值。调用makeRDD()时不可以直接指定RDD的分区个数,分区的个数与Seq[String]参数的个数是保持一致的。

1.4 从外部存储系统中读取数据创建RDD

  • 从外部存储系统中读取数据创建RDD是指直接读取存放在文件系统中的数据文件创建RDD

  • 从内存中读取数据创建RDD的方法常用于测试,从外部存储系统中读取数据创建RDD才是用于实践操作的常用方法。

  • 从外部存储系统中读取数据创建RDD可以有很多种数据来源,可通过SparkContext对象的textFile()方法读取数据集,该方法支持多种类型的数据集,如目录、文本文件、压缩文件和通配符匹配的文件等,并且允许设定分区个数。

  • 分别读取HDFS文件和Linux本地文件的数据并创建RDD,具体操作如下。

通过HDFS文件创建RDD:

直接通过textFile()方法读取HDFS文件的位置即可。

通过Linux本地文件创建RDD:

本地文件的读取也是通过sc.textFile("路径")的方法实现的,在路径前面加上“file://”表示从Linux本地文件系统读取。在IntelliJ IDEA开发环境中可以直接读取本地文件;但在spark-shell中,要求在所有节点的相同位置保存该文件才可以读取它。

二、RDD方法

RDD提供了丰富的操作方法用于操作分布式的数据集合,包括转换操作和行动操作两部分。

  • 转换操作可以将一个RDD转换为一个新的RDD,但是转换操作是懒操作,不会立刻执行计算。

  • 行动操作是用于触发转换操作的操作,这时才会真正开始进行计算。

2.1 使用map()方法转换数据

  • map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD

  • map()方法是转换操作,不会立即进行计算。

  • 转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD

2.2 使用sortBy()方法进行排序

  • sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。

  1. 1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。
  2. 2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false
  3. 3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size
  • 第一个参数是必须输入的,而后面的两个参数可以不输入。

2.3 使用collect()方法查询数据

  • collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。

  • 因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。

  • 因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。

2.4 使用collect()方法查询数据

collect()方法有以下两种操作方式:

  1. collect:直接调用collect返回该RDD中的所有元素,返回类型是一个Array[T]数组。

collect[U: ClassTag](f: PartialFunction[T, U])RDD[U]。这种方式需要提供一个标准的偏函数,将元素保存至一个RDD中。首先定义一个函数one,用于将collect方法得到的数组中数值为1的值替换为“one”,将其他值替换为“other”

2.5 使用flatMap()方法转换数据

  • flatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD

  • 使用flatMap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD

  • 这个转换操作通常用来切分单词。

2.6 使用take()方法查询某几个值

  • take(N)方法用于获取RDD的前N个元素,返回数据为数组。

  • take()collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。

  • 获取RDD的前5个元素

2.7 使用union()方法合并多个RDD

  • union()方法是一种转换操作,用于将两个RDD合并成一个,不进行去重操作,而且两个RDD中每个元素中的值的个数、数据类型需要保持一致。

  • 使用union()方法合并两个RDD

2.8 使用filter()方法进行过滤

  • filter()方法是一种转换操作,用于过滤RDD中的元素。

  • filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。

  • filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD

  • 创建一个RDD,并且过滤掉每个元组第二个值小于等于1的元素。

2.9 使用distinct()方法进行去重

  • distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。

  • 创建一个带有重复数据的RDD,并使用distinct()方法去重。

三、使用简单的集合操作

Spark中的集合操作常用方法(转换操作)

3.1 intersection()方法

  • intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。

  • 创建两个RDD,其中有相同的元素,通过intersection()方法求出两个RDD的交集。

3.2 subtract()方法

  • subtract()方法用于将前一个RDD中在后一个RDD出现的元素删除,可以认为是求补集的操作,返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果。

  • 创建两个RDD,分别为rdd1rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1rdd2彼此的补集。

3.3 cartesian()方法

  • cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。

  • 创建两个RDD,分别有4个元素,通过cartesian()方法求两个RDD的笛卡儿积。

3.4 了解键值对RDD

  • Spark的大部分RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD

  • 顾名思义,键值对RDD由一组组的键值对组成,这些RDD被称为PairRDDPairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。

  • 例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD

3.5 创建键值对RDD

  • 有很多种创建键值对RDD的方式,很多存储键值对的数据格式会在读取时直接返回由其键值对组成的PairRDD

  • 当需要将一个普通的RDD转化为一个PairRDD时可以使用map函数来进行操作,传递的函数需要返回键值对。

3.6 使用键值对RDDkeysvalues方法

  • 键值对RDD,包含键和值两个部分。

  • Spark提供了两种方法,分别获取键值对RDD的键和值。

  1. keys方法返回一个仅包含键的RDD
  2. values方法返回一个仅包含值的RDD

3.7 使用键值对RDDreduceByKey()方法

  • 当数据集以键值对形式展现时,合并统计键相同的值是很常用的操作。

  • reduceByKey()方法用于合并具有相同键的值,作用对象是键值对,并且只对每个键的值进行处理,当RDD中有多个键相同的键值对时,则会对每个键对应的值进行处理。

  • reduceByKey()方法需要接收一个输入函数,键值对RDD相同键的值会根据函数进行合并并且创建一个新的RDD作为返回结果。

3.8 使用键值对RDDreduceByKey()方法

在进行处理时,reduceByKey()方法将相同键的前两个值传给输入函数,产生一个新的返回值,新产生的返回值与RDD中相同键的下一个值组成两个元素,再传给输入函数,直到最后每个键只有一个对应的值为止。reduceByKey()方法不是一种行动操作,而是一种转换操作。

3.9 使用键值对RDDgroupByKey()方法

  • groupByKey()方法用于对具有相同键的值进行分组,可以对同一组的数据进行计数、求和等操作。

  • 对于一个由类型K的键和类型V的值组成的RDD,通过groupByKey()方法得到的RDD类型是[K,Iterable[V]]

3.10 使用join()方法连接两个RDD

  • 将有键的一组数据与另一组有键的数据根据键进行连接,是对键值对数据常用的操作之一。
  • 与合并不同,连接会对键相同的值进行合并,连接方式多种多样,包含内连接、右外连接、左外连接、全外连接,不同的连接方式需要使用不同的连接方法。
  • 连接方法如下表。

3.10.1 join()方法

  • join()方法用于根据键对两个RDD进行内连接,将两个RDD中键相同的数据的值存放在一个元组中,最后只返回两个RDD中都存在的键的连接结果。
  • 例如,在两个RDD中分别有键值对(K,V)和(K,W),通过join()方法连接会返回(K,(V,W))。
  • 创建两个RDD,含有相同键和不同的键,通过join()方法进行内连接。

3.10.2 rightOuterJoin()方法

  • rightOuterJoin()方法用于根据键对两个RDD进行右外连接,连接结果是右边RDD的所有键的连接结果,不管这些键在左边RDD中是否存在。
  • 在rightOuterJoin()方法中,如果在左边RDD中有对应的键,那么连接结果中值显示为Some类型值;如果没有,那么显示为None值。

3.10.3 leftOuterJoin()方法

leftOuterJoin()方法用于根据键对两个RDD进行左外连接,与rightOuterJoin()方法相反,返回结果保留左边RDD的所有键。

3.10.4 fullOuterJoin()方法

fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。

3.11 使用zip()方法组合两个RDD

  • zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。
  • 将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

3.12 使用combineByKey()方法合并相同键的值

  • combineByKey()方法是Spark中一个比较核心的高级方法,键值对的其他一些高级方法底层均是使用combineByKey()方法实现的,如groupByKey()方法、reduceByKey()方法等。
  • combineByKey()方法用于将键相同的数据聚合,并且允许返回类型与输入数据的类型不同的返回值。
  • combineByKey()方法的使用方式如下。

  • combineByKey()方法接收3个重要的参数,具体说明如下:
  1. createCombiner:V=>C,V是键值对RDD中的值部分,将该值转换为另一种类型的值C,C会作为每一个键的累加器的初始值。
  2. mergeValue:(C,V)=>C,该函数将元素V聚合到之前的元素C(createCombiner)上(这个操作在每个分区内进行)。
  3. mergeCombiners:(C,C)=>C,该函数将两个元素C进行合并(这个操作在不同分区间进行)。

  • 由于合并操作会遍历分区中所有的元素,因此每个元素(这里指的是键值对)的键只有两种情况:以前没出现过或以前出现过。对于这两种情况,3个参数的执行情况描述如下:
  1. 如果以前没出现过,则执行的是createCombiner()方法,createCombiner()方法会在新遇到的键对应的累加器中赋予初始值,否则执行mergeValue()方法。
  2. 对于已经出现过的键,调用mergeValue()方法进行合并操作,对该键的累加器对应的当前值(C)与新值(V)进行合并。
  3. 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法对各个分区的结果(全是C)进行合并。

3.13 使用lookup()方法查找指定键的值

lookup(key:K)方法作用于键值对RDD,返回指定键的所有值。

四、读取与存储JSON文件

Spark支持的一些常见文件格式

4.1 JSON文件的读取

4.2 JSON文件的存储

五、读取与存储CSV文件

5.1 CSV文件的读取

5.2 CSV文件的存储

六、读取与存储SequenceFile文件

6.1 SequenceFile文件的存储

6.2 SequenceFile文件的读取

七、读取与存储文本文件

7.1 文本文件的读取

通过textFile()方法即可直接读取,一条记录(一行)作为一个元素。

7.2 文本文件的存储

RDD数据可以直接调用saveAsTextFile()方法将数据存储为文本文件。

扩展:RDD

    RDD分布式对象集合,本质上是一个只读的分区记录集合,不能直接修改,通过转换得到新的RDD。

    在RDD的执行过程中,真正的计算发生在行动操作中,在前面的所有转换,spark只是记录下转换操作应用的一些基础数据集和RDD生成轨迹,不会触发计算。

优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单、高效的容错性、存放的数据可以是JAVA对象

一.核心-RDD

1.1 设计背景

  • 许多迭代式算法《比如机器学习、图算法等》和交互式数据挖掘工具,共同之处是,不同计算阶段之间会重用中间结果
  • 目前的MapReduce框架都是把中间结果写入到磁盘中,带来大量的数据复制、磁盘Io和序列化开销
  • RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据结构
  • 我们不必担心底层数据的分布式持性,只需将具体的应用逻辑表达为一系列转换处理
  • 不同RDD之间的转换操作形成依赖关系,可以实现管道化,避免中间数据存储

1.2 RDD概念

  • 一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,不同节点上进行并行计算
  • RDD提供了一种高度受限的共享内存模型,RDD是只读的记录分区集合,不能直接修改,只能通过在转换的过程中改

1.3 RDD特性

  • 高效的容错性
  • 现有容错机制:数据复制或者记录日志RDD具有天生的容错性:血缘关系,重新计算丢失分区,无需回滚系统,重算过程在不同节点之间并行,只记录粗粒度的操作
  • 中间结果持久化到内存,数据在内存中的多个RDD操作直接按进行传递,避免了不必要的读写磁盘开销
  • 存放的数据可以是JAVA对象,避免了不必要的对象序列化和反序列化

1.4 RDD之间的依赖关系

  • 窄依赖指的是子RDD的一个分区只依赖于某个父RDD中的一个分区。
  • 宽依赖指的是子RDD的每一个分区都依赖于某个父RDD中一个以上的分区。
  • 理解宽、窄依赖的区别,需要先了解父RDD和子RDD。map()、filter()方法上方箭头左边的RDD是父RDD,而右边的RDD是子RDD。union()方法上方箭头左边的两个RDD均为右边RDD的父RDD, union()方法是有两个父RDD 的。

1.5 RDD运行过程

上述对RDD概念、依赖关系和Stage划分的介绍,结合之前介绍的Spark运行基本流程,再总结一下RDD在Spark架构中的运行过程:

  • 创建RDD对象
  • SparkContext负责计算RDD之间的依赖关系,构建DAG
  • DAGScheduler负责把DAG图分解成多个Stage,每个Stage中包含了多个Task,每个Task会被TaskScheduler分发给各个WorkerNode上的Executor去执行。

Spark编程进阶

一、Hadoop与spark区别

Hadoop虽然已经成为大数据技术的事实标准,但其本身存在很多缺陷。比如,mapreduce计算模型延迟过高,无法实现实时快速计算的需求,只适用于离线批处理,I/O磁盘开销大。

spark在借鉴mapreduce优点同时,很好解决了mapreduce存在的缺陷:

  1. spark计算也属于mapreduce计算,但不局限于map和reduce操作;
  2. spark提供内计算,中间结果放入内存,提高迭代运算效率;
  3. 基于DAG的任务调度执行机制,优于mapreduce调度机制。

二、安装IDEA

可以在官网下载安装社区版本:

IntelliJ IDEA – the Leading Java and Kotlin IDE (jetbrains.com)

2.1 安装Scala

在File菜单->Settings->Plugins 插件安装界面搜索scala插件安装。

2.2 Scala下载

(我选择的版本是2.12.15)安装及环境变量的配置

官方下载地址:The Scala Programming Language (scala-lang.org)

 双击打开下载好的安装程序,一直“Next”即可,最好不要安装到C盘,中间修改一下安装路径即可,最后点击“Finish”。我将scala软件安装在了D盘目录下的Develop文件夹,bin路径如下:

配置scala的系统环境变量,将scala安装的bin目录路径加入到系统环境变量path中:

win+R打开命令窗口输入:scala -verison ,进行检测是否成功配置环境变量

2.3 Scala插件(版本要与IDEA版本保持一致,下载2019.2.3版本)的下载安装

官方下载: Versions: Scala Plugin for IntelliJ IDEA & Android Studio | JetBrains Marketplace

下载完成后,将下载的压缩包解压到IDEA安装目录下的plugins目录下

2.4 检测Scala插件是否在IDEA中已经安装成功

2.5 新建scala类文件编写代码

2.6 鼠标点击java文件夹,右键new--->Scala Class

在WordCount文件中编写如下代码:

  1. import org.apache.spark.sql.SparkSession
  2. object WordCount {
  3. def main(args: Array[String]): Unit = {
  4. val spark = SparkSession
  5. .builder()
  6. .master("local[*]")
  7. .appName("word count")
  8. .getOrCreate()
  9. val sc = spark.sparkContext
  10. val rdd = sc.textFile("data/input/words.txt")
  11. val counts = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
  12. counts.collect().foreach(println)
  13. println("全部的单词数:"+counts.count())
  14. counts.saveAsTextFile("data/output/word-count")
  15. }
  16. }

2.7 准备好测试文件words.txt,将文件存放在scalaproject-->data-->input-->words.txt

运行WordCount程序:

运行结果:

三、编写本地运行的spark程序

3.1 编写pom.xml 文件

管理spark程序依赖jar,此时要能上网,在pom.xml文件中,添加如下配置信息

  1. <repositories>
  2. <repository>
  3. <id>aliyun</id>
  4. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  5. </repository>
  6. <repository>
  7. <id>apache</id>
  8. <url>https://repository.apache.org/content/repositories/snapshots/</url>
  9. </repository>
  10. <repository>
  11. <id>cloudera</id>
  12. <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
  13. </repository>
  14. </repositories>
  15. <properties>
  16. <encoding>UTF-8</encoding>
  17. <maven.compiler.source>1.8</maven.compiler.source>
  18. <maven.compiler.target>1.8</maven.compiler.target>
  19. <scala.version>2.12.10</scala.version>
  20. <spark.version>3.0.1</spark.version>
  21. <hadoop.version>2.7.7</hadoop.version>
  22. </properties>
  23. <dependencies>
  24. <!--依赖Scala语言-->
  25. <dependency>
  26. <groupId>org.scala-lang</groupId>
  27. <artifactId>scala-library</artifactId>
  28. <version>${scala.version}</version>
  29. </dependency>
  30. <!--SparkCore依赖-->
  31. <dependency>
  32. <groupId>org.apache.spark</groupId>
  33. <artifactId>spark-core_2.12</artifactId>
  34. <version>${spark.version}</version>
  35. </dependency>
  36. <!--SparkSQL依赖-->
  37. <dependency>
  38. <groupId>org.apache.spark</groupId>
  39. <artifactId>spark-sql_2.12</artifactId>
  40. <version>${spark.version}</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.hadoop</groupId>
  44. <artifactId>hadoop-client</artifactId>
  45. <version>2.7.5</version>
  46. </dependency>
  47. <dependency>
  48. <groupId>com.hankcs</groupId>
  49. <artifactId>hanlp</artifactId>
  50. <version>portable-1.7.7</version>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.projectlombok</groupId>
  54. <artifactId>lombok</artifactId>
  55. <version>1.18.2</version>
  56. <scope>provided</scope>
  57. </dependency>
  58. </dependencies>
  59. <build>
  60. <sourceDirectory>src/main/scala</sourceDirectory>
  61. <plugins>
  62. <!-- 指定编译java的插件 -->
  63. <plugin>
  64. <groupId>org.apache.maven.plugins</groupId>
  65. <artifactId>maven-compiler-plugin</artifactId>
  66. <version>3.5.1</version>
  67. </plugin>
  68. <!-- 指定编译scala的插件 -->
  69. <plugin>
  70. <groupId>net.alchim31.maven</groupId>
  71. <artifactId>scala-maven-plugin</artifactId>
  72. <version>3.2.2</version>
  73. <executions>
  74. <execution>
  75. <goals>
  76. <goal>compile</goal>
  77. <goal>testCompile</goal>
  78. </goals>
  79. <configuration>
  80. <args>
  81. <arg>-dependencyfile</arg>
  82. <arg>${project.build.directory}/.scala_dependencies</arg>
  83. </args>
  84. </configuration>
  85. </execution>
  86. </executions>
  87. </plugin>
  88. <plugin>
  89. <groupId>org.apache.maven.plugins</groupId>
  90. <artifactId>maven-surefire-plugin</artifactId>
  91. <version>2.18.1</version>
  92. <configuration>
  93. <useFile>false</useFile>
  94. <disableXmlReport>true</disableXmlReport>
  95. <includes>
  96. <include>**/*Test.*</include>
  97. <include>**/*Suite.*</include>
  98. </includes>
  99. </configuration>
  100. </plugin>
  101. <plugin>
  102. <groupId>org.apache.maven.plugins</groupId>
  103. <artifactId>maven-shade-plugin</artifactId>
  104. <version>2.3</version>
  105. <executions>
  106. <execution>
  107. <phase>package</phase>
  108. <goals>
  109. <goal>shade</goal>
  110. </goals>
  111. <configuration>
  112. <filters>
  113. <filter>
  114. <artifact>*:*</artifact>
  115. <excludes>
  116. <exclude>META-INF/*.SF</exclude>
  117. <exclude>META-INF/*.DSA</exclude>
  118. <exclude>META-INF/*.RSA</exclude>
  119. </excludes>
  120. </filter>
  121. </filters>
  122. <transformers>
  123. <transformer
  124. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  125. <mainClass></mainClass>
  126. </transformer>
  127. </transformers>
  128. </configuration>
  129. </execution>
  130. </executions>
  131. </plugin>
  132. </plugins>
  133. </build>

 刷新maven工程,会自动下载所需依赖jar,此时会下载时间较长,耐心等待

3.2 编写代码

WordCount .scala文件实现单词计数关键代码的解析如下:

  1. //1.env/准备sc/SparkContext/Spark上下文执行环境
  2. val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
  3. val sc: SparkContext = new SparkContext(conf)
  4. sc.setLogLevel("WARN")
  5. //2.source/读取数据
  6. val lines: RDD[String] = sc.textFile("data/input/words.txt")
  7. //(这里要特别注意一下,你自己电脑的目录下要保证有这个words.txt文件)
  8. //3.transformation/数据操作/转换
  9. //切割:RDD[一个个的单词]
  10. val words: RDD[String] = lines.flatMap(_.split(" "))
  11. //记为1:RDD[(单词, 1)]
  12. val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
  13. val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
  14. //4.输出
  15. //直接输出
  16. result.foreach(println)
  17. //输出到指定path(可以是文件/夹)
  18. result.repartition(1).saveAsTextFile("data/output/result")
  19. //为了便于查看Web-UI可以让程序
  20. Thread.sleep(1000 * 60)
  21. //5.关闭资源
  22. sc.stop()

四、spark-submit 详细参数说明

参数名参数说明
--mastermaster 的地址,提交任务到哪里执行,例如 spark://host:port,  yarn,  local
--deploy-mode 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client
--class应用程序的主类,仅针对 java 或 scala 应用
--name应用程序的名称
--jars用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下
--packages包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标
--exclude-packages为了避免冲突 而指定不包含的 package
--repositories远程 repository
--conf PROP=VALUE

 指定 spark 配置属性的值,

 例如-conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"

--properties-file加载的配置文件,默认为 conf/spark-defaults.conf
--driver-memory Driver内存,默认 1G
--driver-java-options传给 driver 的额外的 Java 选项
--driver-library-path 传给 driver 的额外的库路径
--driver-class-path 传给 driver 的额外的类路径
--driver-cores Driver 的核数,默认是1。在 yarn 或者 standalone 下使用
--executor-memory每个 executor 的内存,默认是1G
--total-executor-cores 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用
--num-executors启动的 executor 数量。默认为2。在 yarn 下使用
--executor-core每个 executor 的核数。在yarn或者standalone下使用

 五、完整代码实现

5.1 WordCount.scala文件在本地运行:

  1. package net.objet
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object WordCount {
  5. def main(args: Array[String]): Unit = {
  6. //1.env/准备sc/SparkContext/Spark上下文执行环境
  7. val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
  8. val sc: SparkContext = new SparkContext(conf)
  9. sc.setLogLevel("WARN")
  10. //2.source/读取数据
  11. val lines: RDD[String] = sc.textFile("data/input/words.txt")
  12. //3.transformation/数据操作/转换
  13. //切割:RDD[一个个的单词]
  14. val words: RDD[String] = lines.flatMap(_.split(" "))
  15. //记为1:RDD[(单词, 1)]
  16. val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
  17. val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
  18. //4.输出
  19. //直接输出
  20. result.foreach(println)
  21. //输出到指定path(可以是文件/夹)
  22. result.repartition(1).saveAsTextFile("data/output/result")
  23. //为了便于查看Web-UI可以让程序
  24. Thread.sleep(1000 * 60)
  25. //5.关闭资源
  26. sc.stop()
  27. }
  28. }

5.2 WordCount.scala文件在yarm上运行:

  1. package net
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. object WordCount {
  5. def main(args: Array[String]): Unit = {
  6. if(args.length < 2){
  7. println("请指定input和output")
  8. System.exit(1)//非0表示非正常退出程序
  9. }
  10. //1.env/准备sc/SparkContext/Spark上下文执行环境
  11. val conf: SparkConf = new SparkConf().setAppName("wc") //.setMaster("local[*]")
  12. val sc: SparkContext = new SparkContext(conf)
  13. sc.setLogLevel("WARN")
  14. //2.source/读取数据
  15. val lines: RDD[String] = sc.textFile(args(0))
  16. //3.transformation/数据操作/转换
  17. //切割:RDD[一个个的单词]
  18. val words: RDD[String] = lines.flatMap(_.split(" "))
  19. //记为1:RDD[(单词, 1)]
  20. val wordAndOnes: RDD[(String, Int)] = words.map((_,1))
  21. val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_)
  22. //4.输出
  23. //直接输出
  24. result.foreach(println)
  25. //输出到指定path(可以是文件/夹)
  26. result.repartition(1).saveAsTextFile(args(1))
  27. //为了便于查看Web-UI可以让程序Thread.sleep(1000 * 60)
  28. //5.关闭资源
  29. sc.stop()
  30. }
  31. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/572479
推荐阅读
相关标签
  

闽ICP备14008679号