当前位置:   article > 正文

spark01_spark1

spark1

1.Spark初始
1.什么是Spark
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行计算框架,Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Spark是Scala编写,方便快速编程。
2.总体技术栈讲解

Apache Mesos跟YARN类似,称为分布式计算内核。Apache Mesos从计算机(物理或虚拟)中提取CPU,内存,存储和其他计算资源,从而使容错和弹性的分布式系统易于构建和有效运行。
Tachyon是个分布式的内存文件系统,它在减轻Spark内存压力的同时赋予了Spark内存快速大量数据读写的能力。Tachyon把存储与数据读写的功能从Spark中分离,使得Spark更专注在计算的本身,以求通过更细的分工达到更高的执行效率。
  • 1
  • 2

3.Spark演变历史
1)2009年,Spark诞生于伯克利大学AMPLab,属于伯克利大学的研究性项目;
2)2010年,通过BSD许可协议正式对外开源发布;
3)2012年,Spark第一篇论文发布,第一个正式版(Spark 0.6.0)发布;
4)2013年,成为了Aparch基金项目;发布Spark Streaming、Spark Mllib(机器学习)、Shark(Spark on Hadoop);
5)2014年,Spark 成为 Apache 的顶级项目;5月底Spark1.0.0发布;发布Spark Graphx(图计算)、Spark SQL代替Shark;
6)2015年,推出DataFrame(大数据分析);2015年至今,Spark在国内IT行业变得愈发火爆,大量的公司开始重点部署或者使用Spark来替代MapReduce、Hive、Storm等传统的大数据计算框架;
7)2016年,推出dataset(更强的数据分析手段);
8)2017年,structured streaming发布;
9)2018年,Spark2.4.0发布,成为全球最大的开源项目。
4.Spark与MapReduce的区别
1)都是分布式计算框架,Spark基于内存,MR基于HDFS。Spark处理数据的能力一般是MR的十倍以上,Spark中除了基于内存计算外,还有DAG有向无环图来切分任务的执行先后顺序。
5.Spark运行模式
Local
多用于本地测试,如在eclipse,idea中写程序测试等。
Standalone
Standalone是Spark自带的一个资源调度框架,它支持完全分布式。
Yarn
Hadoop生态圈里面的一个资源调度框架,Spark也是可以基于Yarn来计算的。
Mesos
资源调度框架。
要基于Yarn来进行资源调度,必须实现AppalicationMaster接口,Spark实现了这个接口,所以可以基于Yarn。
6. 使用方式
Master URL Meaning
local spark在本地使用一个worker线程运行,没有并发。
local[K] 在本地使用K个worker线程运行作业。一般K是本地计算机逻辑核心数
local[K,F] 在本地使用K个worker线程运行作业,并设置F个最大失败次数。
local[] 在本地使用本地计算机逻辑内核数个worker线程运行作业。
local[
,F] 在本地使用本地计算机逻辑内核数个worker线程运行作业以及设置F个最大失败次数。
spark://HOST:PORT 连接到指定的spark独立集群的master。端口号默认是7077。
spark://HOST1:PORT1,HOST2:PORT2 连接到使用zk配置了master高可用的集群。必须配置ha中所有的master地址。
mesos://HOST:PORT 连接到Mesos集群。端口号默认5050。
yarn 连接到YARN集群。使用–deploy-mode指定是客户端模式还是集群模式。使用HADOOP_CONF_DIR或者YARN_CONF_DIR环境变量指定YARN的地址信息。
k8s://HOST:PORT 使用集群模式连接到Kubernates集群。现阶段不支持k8s的客户端模式。

2.Spark Java-Scala 混编Maven开发
1.IDEA创建Maven 项目
1)创建项目

2)创建选择 maven-archetype-quickstart

3)配置名称,点击下一步配置Maven及本地Maven仓库地址。

4)配置项目名称和位置,并创建。

5)更新替换Maven pom.xml文件,注意groupId,artifactId,version不要更新替换。
6)在main 目录下创建javaCode和scalaCode 并指定为源目录。名称任意。

将main下的javaCode和scalaCode指定为源目录:

2.配置的pom.xml文件
demo-parent中的pom.xml内容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bjsxt.spark.demo</groupId>
    <artifactId>demo-parent</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!-- Spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL  ON  Hive-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!--mysql依赖的jar包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <!--SparkStreaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <!-- SparkStreaming + Kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- 向kafka 生产数据需要包 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <!--连接 Redis 需要的包-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.6.1</version>
        </dependency>

        <!-- Scala 包-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.7</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.7</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.7</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.12</version>
        </dependency>
        <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- maven打jar包需要插件 -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.bjsxt.scalaspark.sql.windows.OverFunctionOnHive</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 以上assembly可以将依赖的包打入到一个jar包中,下面这种方式是使用maven原生的方式打jar包,不将依赖的包打入到最终的jar包中 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <!-- 指定当前主类运行时找依赖的jar包时 所有依赖的jar包存放路径的前缀 -->
                            <classpathPrefix>/alljars/lib</classpathPrefix>
                            <mainClass>com.bjsxt.javaspark.sql.CreateDataSetFromHive</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <!-- 拷贝依赖的jar包到lib目录 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>
                                <!-- 将依赖的jar 包复制到target/lib下 -->
                                ${project.build.directory}/lib
                            </outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172

3.JavaWordCount.java

package com.bjsxt.spark.demo;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class JavaWordCount {

    public static void main(String[] args) {
        // 创建spark配置对象
        SparkConf conf = new SparkConf();
        // 设置运行模式:local
        conf.setMaster("local");
        // 设置本程序名称
        conf.setAppName("java word count");
        // 创建spark的上下文
        JavaSparkContext context = new JavaSparkContext(conf);

        // 读取指定文件,得到RDD
        JavaRDD<String> docRDD = context.textFile("demo/hello.txt");

        // 将文档中的每个句子使用空格切割,并将所有切割后的单词放到一个集合中:RDD
        JavaRDD<String> wordsRDD = docRDD.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        // 将单词映射为单词和次数
        JavaPairRDD<String, Integer> wordCountsRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                // 单词   1
                Tuple2<String, Integer> tuple2 = new Tuple2<String, Integer>(s, 1);
                return tuple2;
            }
        });

        // 设置聚合的方式,数字相加
        JavaPairRDD<String, Integer> resultRDD = wordCountsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer num1, Integer num2) throws Exception {
                return num1 + num2;
            }
        });

        // 输出计算结果
        resultRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> pair) throws Exception {
                System.out.println(pair._1 + " === " + pair._2);
            }
        });

        // 关闭spark上下文对象
        context.stop();
        // 该方法调用了stop()
//        context.close();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

4.ScalaWordCount.scala

package com.bjsxt.spark.demo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    // 创建配置对象
    val conf = new SparkConf()
    // 设置运行模式:本地
    conf.setMaster("local")
    // 设置当前应用的名称
    conf.setAppName("scala word count")
    // 创建spark的上下文对象
    val context = new SparkContext(conf)
    // 读取文件
    val docRDD: RDD[String] = context.textFile("demo/hello.txt ")
    // 将文件中每个句子用空格切割为单词
    val wordsRDD: RDD[String] = docRDD.flatMap(_.split(" "))
    // 将单词映射为二元组
    val wordsCountRDD: RDD[(String, Int)] = wordsRDD.map((_, 1))
    // 聚合
    val resultRDD: RDD[(String, Int)] = wordsCountRDD.reduceByKey((num1, num2) => num1 + num2)
    // 打印结果
    resultRDD.foreach(println)
    // 关闭上下文对象
    context.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

3.SparkCore
1.RDD
概念
RDD(Resilient Distributed Dateset),弹性分布式数据集。
RDD的五大特性:
1.RDD是由一系列的partition组成的。
2.函数是作用在每一个partition(split)上的。
3.RDD之间有一系列的依赖关系。
4.分区器是作用在K,V格式的RDD上。
5.RDD提供一系列最佳的计算位置。
RDD理解图:
数据的血统:

注意:
textFile方法底层封装的是读取MR读取文件的方式,读取文件之前先split,默认split大小是一个block大小。
RDD实际上不存储数据,这里方便理解,暂时理解为存储数据。
什么是K,V格式的RDD?
如果RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD。
哪里体现RDD的弹性(容错)?
partition数量,大小没有限制,体现了RDD的弹性。
RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。
哪里体现RDD的分布式?
RDD是由Partition组成,partition是分布在不同节点上的。
RDD提供计算最佳位置,体现了数据本地化。体现了大数据中“计算向数据移动”的理念。
2.Spark任务执行原理

以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。
Driver与集群节点之间有频繁的通信。
Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了。会造成oom。
Worker是Standalone资源调度框架里面资源管理的从节点。也是JVM进程。
Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。

3.Spark代码流程
1.创建SparkConf对象
可以设置Application name。
可以设置运行模式及资源需求。
2.创建SparkContext对象
3.基于Spark的上下文创建一个RDD,对RDD进行处理。
4.应用程序中要有Action类算子来触发Transformation类算子执行。
5.关闭Spark上下文对象SparkContext。
4.Transformations转换算子
概念:
Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。
Transformation类算子:
filter
过滤符合条件的记录数,true保留,false过滤掉。
过滤出包含“nihao”的数据

map
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
特点:输入一条,输出一条数据。

flatMap
先map后flat。与map类似,每个输入项可以映射为0到多个输出项。

sample
随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。

reduceByKey
将相同的Key根据相应的逻辑进行处理。
sortByKey/sortBy
作用在K,V格式的RDD上,对key进行升序或者降序排序。

SortBy.scala

package com.bjsxt.spark.demo

import org.apache.spark.{SparkConf, SparkContext}

object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("scala word count")
    val sc = new SparkContext(conf)

    sc.textFile(
      "demo-parent/demo-01/sortby.txt"
    ).flatMap(
      _ split "[ \\t]"
    ).map(
      (_, 1)
    ).reduceByKey(
      (num1, num2) => num1 + num2
    ).sortBy(
      _._2, false  //flase的意思是倒排
    ).foreach(println)

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

5.Action行动算子
概念:
Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。
Action类算子
count
返回数据集中的元素数。会在结果计算完成后回收到Driver端。

take(n)
返回一个包含数据集前n个元素的集合。

first
first=take(1),返回数据集中的第一个元素。

foreach
循环遍历数据集中的每个元素,运行相应的逻辑。

collect
将计算结果回收到Driver端。

CollectDemo.scala

package com.bjsxt.spark.demo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CollectDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[4]")
    conf.setAppName("collect demo")
    val sc = new SparkContext(conf)

    val docRDD: RDD[String] = sc.textFile("demo-parent/demo-02/collect.txt")
//    val wordsRDD: RDD[String] = docRDD.flatMap(_ split " ")
    val wordsRDD: RDD[String] = docRDD.flatMap(_ split "[ \\t]")

    val pairRDD: RDD[(String, Int)] = wordsRDD.map((_, 1))

    val wordCountsRDD: RDD[(String, Int)] = pairRDD.reduceByKey((num1, num2) => num1 + num2)

    // 将结果以Array的形式收集到Driver端
    val tuples: Array[(String, Int)] = wordCountsRDD.collect()
    // 将结果以Map的形式收集到Driver端
    val stringToInt: collection.Map[String, Int] = wordCountsRDD.collectAsMap()

    tuples.foreach(println)
    stringToInt.foreach(entry =>println(entry._1 + "\t\t\t" + entry._2))
    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

思考:一千万条数据量的文件,过滤掉出现次数多的记录(filter),并且其余记录按照出现次数降序排序。(sort)
文件:

代码:

6.控制算子
概念:
控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
cache
默认将RDD的数据持久化到内存中。cache是懒执行。
注意:chche () = persist()=persist(StorageLevel.Memory_Only)
测试cache文件:
文件:见“NASA_access_log_Aug95”文件。
测试代码:

 package com.bjsxt.spark.demo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CacheDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[8]")
    conf.setAppName("cache demo")
    val sc = new SparkContext(conf)

    val textRDD: RDD[String] = sc.textFile("demo-parent/demo-03/persistData.txt")
    // 缓存到内存,下次计算使用该RDD
    textRDD.cache()
    val start = System.currentTimeMillis()
    println(s"共有数据:${textRDD.count()}条!")
    val end = System.currentTimeMillis()

    println(s"共有数据:${textRDD.count()}条!")
    val end1 = System.currentTimeMillis()
    println(s"第一次运行耗时:${end - start} ms")
    println(s"第二次运行耗时:${end1 - end} ms")
    Thread.sleep(10000)
    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

persist:
可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。
持久化级别如下:

cache和persist的注意事项:
1.cache和persist都是懒执行,必须有一个action类算子触发执行。
2.推荐cache和persist算子的返回值赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。不使用返回值也可以,源码可见返回值对象还是原来的对象。
3.cache和persist算子后不能立即紧跟action算子。
4.cache和persist算子持久化的数据当applilcation执行完成之后会被清除。
错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。
checkpoint
checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
checkpoint 的执行原理:
1.当RDD的job执行完毕后,会从finalRDD从后往前回溯。
2.当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
3.Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
checkpoint可以缓存到外部系统中,persist只能在本spark应用中起作用,如果想用到其他Spark应用,需要使用checkpoint。
checkpoint通常是为了容灾,如果为了提升性能的话,基本是不会考虑checkpoint的,因为它涉及磁盘的I/O,磁盘I/O是非常慢的,因此这种情况即使重算可能都会比checkpoint快。
设置checkpoint目录,设置以后,运行spark程序会在hdfs上创建目录。

使用:
1.如果Job运行很慢的话,选择使用persist,如果Job经常失败的话,选择使用checkpoint。
2.persist并没有完全缓存到外部系统的选项,它只有当内存存储不开才可能将剩余的数据缓存到外部系统。
3.persist可以完全替代cache。

package com.bjsxt.spark.demo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CheckpointDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[8]")
    conf.setAppName("cache demo")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("./checkpoint")

    val textRDD: RDD[String] = sc.textFile("demo-parent/demo-03/persistData.txt")

    val filteredRDD = textRDD.filter(!_.matches("\"\\d*-\\d*\"\t\"[0-9]{4}\\/[0-9]{1,2}\\/\\d{1,2} \\d{1,2}:\\d{1,2}:\\d{1,2}.\\d{1,3}\"\\t\"\\d*\"\\t\"\\d*\"\\t\"\\d*\""))

//    // 先缓存到内存
//    filteredRDD.cache()
    filteredRDD.checkpoint()
    var start = System.currentTimeMillis()
    println(s"共有数据:${filteredRDD.count()}条!")
    var end = System.currentTimeMillis()
    println(s"共耗时:${end - start} ms")

    start = System.currentTimeMillis()
    println(s"共有数据:${filteredRDD.count()}条!")
    end = System.currentTimeMillis()
    println(s"共耗时:${end - start} ms")

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

4.集群搭建以及测试
1.搭建
Standalone
1).下载安装包,解压

解压到/opt目录下
tar -zxf spark-2.3.1-bin-hadoop2.6.tgz -C /opt

2).进入安装包的conf目录下,修改slaves.template文件,添加从节点。保存。
cp slaves.template slaves
vim slaves

3).修改spark-env.sh
cp spark-env.sh.template spark-env.sh
vim spark-env.sh

其中:
SPARK_MASTER_HOST:master的ip
SPARK_MASTER_PORT:提交任务的端口,默认是7077
SPARK_WORKER_CORES:每个worker从节点能够支配的core的个数
SPARK_WORKER_MEMORY:每个worker从节点能够支配的内存数

4).同步到其他节点上
for i in 2 3 4; do scp -r spark-2.3.1-bin-hadoop2.6/ node$i:pwd; done

5).启动集群
进入sbin目录下,执行当前目录下的./start-all.sh

6).搭建客户端
将spark安装包原封不动的拷贝到一个新的节点上,然后,在新的节点上提交任务即可。
注意:
8080是Spark WEBUI界面的端口,7077是Spark任务提交的端口。
修改master的WEBUI端口:
修改start-master.sh即可。
在start-all.sh中,通过start-master.sh启动master:

start-master.sh中:

也可以在Master节点上导入临时环境变量,只是作用于之后的程序,重启就无效了。
[root@node1 sbin]# export SPARK_MASTER_WEBUI_PORT=9999

删除临时环境变量:
[root@node1 sbin]# export -n SPARK_MASTER_WEBUI_PORT=9999

yarn
1).同standalone。
2).在客户端中配置:

2.测试
PI案例:

Standalone提交命令:

YARN提交命令:
如果使用YARN,注意要修改yarn-site.xml中的
yarn.nodemanager.vmem-pmem-ratio,虚拟内存和物理内存比例。
默认值2.1对我们的虚拟机来说太小,因为虚拟机内存1GB。设置为3就可以了。

官网对打包和提交spark应用的说明:
http://spark.apache.org/docs/2.3.1/submitting-applications.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/593283
推荐阅读
相关标签
  

闽ICP备14008679号