赞
踩
1.Spark初始
1.什么是Spark
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行计算框架,Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Spark是Scala编写,方便快速编程。
2.总体技术栈讲解
Apache Mesos跟YARN类似,称为分布式计算内核。Apache Mesos从计算机(物理或虚拟)中提取CPU,内存,存储和其他计算资源,从而使容错和弹性的分布式系统易于构建和有效运行。
Tachyon是个分布式的内存文件系统,它在减轻Spark内存压力的同时赋予了Spark内存快速大量数据读写的能力。Tachyon把存储与数据读写的功能从Spark中分离,使得Spark更专注在计算的本身,以求通过更细的分工达到更高的执行效率。
3.Spark演变历史
1)2009年,Spark诞生于伯克利大学AMPLab,属于伯克利大学的研究性项目;
2)2010年,通过BSD许可协议正式对外开源发布;
3)2012年,Spark第一篇论文发布,第一个正式版(Spark 0.6.0)发布;
4)2013年,成为了Aparch基金项目;发布Spark Streaming、Spark Mllib(机器学习)、Shark(Spark on Hadoop);
5)2014年,Spark 成为 Apache 的顶级项目;5月底Spark1.0.0发布;发布Spark Graphx(图计算)、Spark SQL代替Shark;
6)2015年,推出DataFrame(大数据分析);2015年至今,Spark在国内IT行业变得愈发火爆,大量的公司开始重点部署或者使用Spark来替代MapReduce、Hive、Storm等传统的大数据计算框架;
7)2016年,推出dataset(更强的数据分析手段);
8)2017年,structured streaming发布;
9)2018年,Spark2.4.0发布,成为全球最大的开源项目。
4.Spark与MapReduce的区别
1)都是分布式计算框架,Spark基于内存,MR基于HDFS。Spark处理数据的能力一般是MR的十倍以上,Spark中除了基于内存计算外,还有DAG有向无环图来切分任务的执行先后顺序。
5.Spark运行模式
Local
多用于本地测试,如在eclipse,idea中写程序测试等。
Standalone
Standalone是Spark自带的一个资源调度框架,它支持完全分布式。
Yarn
Hadoop生态圈里面的一个资源调度框架,Spark也是可以基于Yarn来计算的。
Mesos
资源调度框架。
要基于Yarn来进行资源调度,必须实现AppalicationMaster接口,Spark实现了这个接口,所以可以基于Yarn。
6. 使用方式
Master URL Meaning
local spark在本地使用一个worker线程运行,没有并发。
local[K] 在本地使用K个worker线程运行作业。一般K是本地计算机逻辑核心数
local[K,F] 在本地使用K个worker线程运行作业,并设置F个最大失败次数。
local[] 在本地使用本地计算机逻辑内核数个worker线程运行作业。
local[,F] 在本地使用本地计算机逻辑内核数个worker线程运行作业以及设置F个最大失败次数。
spark://HOST:PORT 连接到指定的spark独立集群的master。端口号默认是7077。
spark://HOST1:PORT1,HOST2:PORT2 连接到使用zk配置了master高可用的集群。必须配置ha中所有的master地址。
mesos://HOST:PORT 连接到Mesos集群。端口号默认5050。
yarn 连接到YARN集群。使用–deploy-mode指定是客户端模式还是集群模式。使用HADOOP_CONF_DIR或者YARN_CONF_DIR环境变量指定YARN的地址信息。
k8s://HOST:PORT 使用集群模式连接到Kubernates集群。现阶段不支持k8s的客户端模式。
2.Spark Java-Scala 混编Maven开发
1.IDEA创建Maven 项目
1)创建项目
2)创建选择 maven-archetype-quickstart
3)配置名称,点击下一步配置Maven及本地Maven仓库地址。
4)配置项目名称和位置,并创建。
5)更新替换Maven pom.xml文件,注意groupId,artifactId,version不要更新替换。
6)在main 目录下创建javaCode和scalaCode 并指定为源目录。名称任意。
将main下的javaCode和scalaCode指定为源目录:
2.配置的pom.xml文件
demo-parent中的pom.xml内容
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bjsxt.spark.demo</groupId> <artifactId>demo-parent</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- Spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.1</version> </dependency> <!-- SparkSQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.1</version> </dependency> <!-- SparkSQL ON Hive--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.3.1</version> </dependency> <!--mysql依赖的jar包--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <!--SparkStreaming--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.1</version> <!--<scope>provided</scope>--> </dependency> <!-- SparkStreaming + Kafka --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.1</version> </dependency> <!-- 向kafka 生产数据需要包 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency> <!--连接 Redis 需要的包--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.6.1</version> </dependency> <!-- Scala 包--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.7</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.7</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.7</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.12</version> </dependency> <dependency> <groupId>com.google.collections</groupId> <artifactId>google-collections</artifactId> <version>1.0</version> </dependency> </dependencies> <build> <plugins> <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 --> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- maven打jar包需要插件 --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version> <configuration> <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” --> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.bjsxt.scalaspark.sql.windows.OverFunctionOnHive</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <!-- 以上assembly可以将依赖的包打入到一个jar包中,下面这种方式是使用maven原生的方式打jar包,不将依赖的包打入到最终的jar包中 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <!-- 指定当前主类运行时找依赖的jar包时 所有依赖的jar包存放路径的前缀 --> <classpathPrefix>/alljars/lib</classpathPrefix> <mainClass>com.bjsxt.javaspark.sql.CreateDataSetFromHive</mainClass> </manifest> </archive> </configuration> </plugin> <!-- 拷贝依赖的jar包到lib目录 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory> <!-- 将依赖的jar 包复制到target/lib下 --> ${project.build.directory}/lib </outputDirectory> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
3.JavaWordCount.java
package com.bjsxt.spark.demo; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class JavaWordCount { public static void main(String[] args) { // 创建spark配置对象 SparkConf conf = new SparkConf(); // 设置运行模式:local conf.setMaster("local"); // 设置本程序名称 conf.setAppName("java word count"); // 创建spark的上下文 JavaSparkContext context = new JavaSparkContext(conf); // 读取指定文件,得到RDD JavaRDD<String> docRDD = context.textFile("demo/hello.txt"); // 将文档中的每个句子使用空格切割,并将所有切割后的单词放到一个集合中:RDD JavaRDD<String> wordsRDD = docRDD.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); // 将单词映射为单词和次数 JavaPairRDD<String, Integer> wordCountsRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { // 单词 1 Tuple2<String, Integer> tuple2 = new Tuple2<String, Integer>(s, 1); return tuple2; } }); // 设置聚合的方式,数字相加 JavaPairRDD<String, Integer> resultRDD = wordCountsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer num1, Integer num2) throws Exception { return num1 + num2; } }); // 输出计算结果 resultRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> pair) throws Exception { System.out.println(pair._1 + " === " + pair._2); } }); // 关闭spark上下文对象 context.stop(); // 该方法调用了stop() // context.close(); } }
4.ScalaWordCount.scala
package com.bjsxt.spark.demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ScalaWordCount { def main(args: Array[String]): Unit = { // 创建配置对象 val conf = new SparkConf() // 设置运行模式:本地 conf.setMaster("local") // 设置当前应用的名称 conf.setAppName("scala word count") // 创建spark的上下文对象 val context = new SparkContext(conf) // 读取文件 val docRDD: RDD[String] = context.textFile("demo/hello.txt ") // 将文件中每个句子用空格切割为单词 val wordsRDD: RDD[String] = docRDD.flatMap(_.split(" ")) // 将单词映射为二元组 val wordsCountRDD: RDD[(String, Int)] = wordsRDD.map((_, 1)) // 聚合 val resultRDD: RDD[(String, Int)] = wordsCountRDD.reduceByKey((num1, num2) => num1 + num2) // 打印结果 resultRDD.foreach(println) // 关闭上下文对象 context.stop() } }
3.SparkCore
1.RDD
概念
RDD(Resilient Distributed Dateset),弹性分布式数据集。
RDD的五大特性:
1.RDD是由一系列的partition组成的。
2.函数是作用在每一个partition(split)上的。
3.RDD之间有一系列的依赖关系。
4.分区器是作用在K,V格式的RDD上。
5.RDD提供一系列最佳的计算位置。
RDD理解图:
数据的血统:
注意:
textFile方法底层封装的是读取MR读取文件的方式,读取文件之前先split,默认split大小是一个block大小。
RDD实际上不存储数据,这里方便理解,暂时理解为存储数据。
什么是K,V格式的RDD?
如果RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD。
哪里体现RDD的弹性(容错)?
partition数量,大小没有限制,体现了RDD的弹性。
RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。
哪里体现RDD的分布式?
RDD是由Partition组成,partition是分布在不同节点上的。
RDD提供计算最佳位置,体现了数据本地化。体现了大数据中“计算向数据移动”的理念。
2.Spark任务执行原理
以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。
Driver与集群节点之间有频繁的通信。
Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了。会造成oom。
Worker是Standalone资源调度框架里面资源管理的从节点。也是JVM进程。
Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。
3.Spark代码流程
1.创建SparkConf对象
可以设置Application name。
可以设置运行模式及资源需求。
2.创建SparkContext对象
3.基于Spark的上下文创建一个RDD,对RDD进行处理。
4.应用程序中要有Action类算子来触发Transformation类算子执行。
5.关闭Spark上下文对象SparkContext。
4.Transformations转换算子
概念:
Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。
Transformation类算子:
filter
过滤符合条件的记录数,true保留,false过滤掉。
过滤出包含“nihao”的数据
map
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
特点:输入一条,输出一条数据。
flatMap
先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
sample
随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
reduceByKey
将相同的Key根据相应的逻辑进行处理。
sortByKey/sortBy
作用在K,V格式的RDD上,对key进行升序或者降序排序。
SortBy.scala
package com.bjsxt.spark.demo import org.apache.spark.{SparkConf, SparkContext} object ScalaWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("scala word count") val sc = new SparkContext(conf) sc.textFile( "demo-parent/demo-01/sortby.txt" ).flatMap( _ split "[ \\t]" ).map( (_, 1) ).reduceByKey( (num1, num2) => num1 + num2 ).sortBy( _._2, false //flase的意思是倒排 ).foreach(println) sc.stop() } }
5.Action行动算子
概念:
Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。
Action类算子
count
返回数据集中的元素数。会在结果计算完成后回收到Driver端。
take(n)
返回一个包含数据集前n个元素的集合。
first
first=take(1),返回数据集中的第一个元素。
foreach
循环遍历数据集中的每个元素,运行相应的逻辑。
collect
将计算结果回收到Driver端。
CollectDemo.scala
package com.bjsxt.spark.demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CollectDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[4]") conf.setAppName("collect demo") val sc = new SparkContext(conf) val docRDD: RDD[String] = sc.textFile("demo-parent/demo-02/collect.txt") // val wordsRDD: RDD[String] = docRDD.flatMap(_ split " ") val wordsRDD: RDD[String] = docRDD.flatMap(_ split "[ \\t]") val pairRDD: RDD[(String, Int)] = wordsRDD.map((_, 1)) val wordCountsRDD: RDD[(String, Int)] = pairRDD.reduceByKey((num1, num2) => num1 + num2) // 将结果以Array的形式收集到Driver端 val tuples: Array[(String, Int)] = wordCountsRDD.collect() // 将结果以Map的形式收集到Driver端 val stringToInt: collection.Map[String, Int] = wordCountsRDD.collectAsMap() tuples.foreach(println) stringToInt.foreach(entry =>println(entry._1 + "\t\t\t" + entry._2)) sc.stop() } }
思考:一千万条数据量的文件,过滤掉出现次数多的记录(filter),并且其余记录按照出现次数降序排序。(sort)
文件:
代码:
6.控制算子
概念:
控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
cache
默认将RDD的数据持久化到内存中。cache是懒执行。
注意:chche () = persist()=persist(StorageLevel.Memory_Only)
测试cache文件:
文件:见“NASA_access_log_Aug95”文件。
测试代码:
package com.bjsxt.spark.demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CacheDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[8]") conf.setAppName("cache demo") val sc = new SparkContext(conf) val textRDD: RDD[String] = sc.textFile("demo-parent/demo-03/persistData.txt") // 缓存到内存,下次计算使用该RDD textRDD.cache() val start = System.currentTimeMillis() println(s"共有数据:${textRDD.count()}条!") val end = System.currentTimeMillis() println(s"共有数据:${textRDD.count()}条!") val end1 = System.currentTimeMillis() println(s"第一次运行耗时:${end - start} ms") println(s"第二次运行耗时:${end1 - end} ms") Thread.sleep(10000) sc.stop() } }
persist:
可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。
持久化级别如下:
cache和persist的注意事项:
1.cache和persist都是懒执行,必须有一个action类算子触发执行。
2.推荐cache和persist算子的返回值赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。不使用返回值也可以,源码可见返回值对象还是原来的对象。
3.cache和persist算子后不能立即紧跟action算子。
4.cache和persist算子持久化的数据当applilcation执行完成之后会被清除。
错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。
checkpoint
checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
checkpoint 的执行原理:
1.当RDD的job执行完毕后,会从finalRDD从后往前回溯。
2.当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
3.Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
checkpoint可以缓存到外部系统中,persist只能在本spark应用中起作用,如果想用到其他Spark应用,需要使用checkpoint。
checkpoint通常是为了容灾,如果为了提升性能的话,基本是不会考虑checkpoint的,因为它涉及磁盘的I/O,磁盘I/O是非常慢的,因此这种情况即使重算可能都会比checkpoint快。
设置checkpoint目录,设置以后,运行spark程序会在hdfs上创建目录。
使用:
1.如果Job运行很慢的话,选择使用persist,如果Job经常失败的话,选择使用checkpoint。
2.persist并没有完全缓存到外部系统的选项,它只有当内存存储不开才可能将剩余的数据缓存到外部系统。
3.persist可以完全替代cache。
package com.bjsxt.spark.demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CheckpointDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[8]") conf.setAppName("cache demo") val sc = new SparkContext(conf) sc.setCheckpointDir("./checkpoint") val textRDD: RDD[String] = sc.textFile("demo-parent/demo-03/persistData.txt") val filteredRDD = textRDD.filter(!_.matches("\"\\d*-\\d*\"\t\"[0-9]{4}\\/[0-9]{1,2}\\/\\d{1,2} \\d{1,2}:\\d{1,2}:\\d{1,2}.\\d{1,3}\"\\t\"\\d*\"\\t\"\\d*\"\\t\"\\d*\"")) // // 先缓存到内存 // filteredRDD.cache() filteredRDD.checkpoint() var start = System.currentTimeMillis() println(s"共有数据:${filteredRDD.count()}条!") var end = System.currentTimeMillis() println(s"共耗时:${end - start} ms") start = System.currentTimeMillis() println(s"共有数据:${filteredRDD.count()}条!") end = System.currentTimeMillis() println(s"共耗时:${end - start} ms") sc.stop() } }
4.集群搭建以及测试
1.搭建
Standalone
1).下载安装包,解压
解压到/opt目录下
tar -zxf spark-2.3.1-bin-hadoop2.6.tgz -C /opt
2).进入安装包的conf目录下,修改slaves.template文件,添加从节点。保存。
cp slaves.template slaves
vim slaves
3).修改spark-env.sh
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
其中:
SPARK_MASTER_HOST:master的ip
SPARK_MASTER_PORT:提交任务的端口,默认是7077
SPARK_WORKER_CORES:每个worker从节点能够支配的core的个数
SPARK_WORKER_MEMORY:每个worker从节点能够支配的内存数
4).同步到其他节点上
for i in 2 3 4; do scp -r spark-2.3.1-bin-hadoop2.6/ node$i:pwd
; done
5).启动集群
进入sbin目录下,执行当前目录下的./start-all.sh
6).搭建客户端
将spark安装包原封不动的拷贝到一个新的节点上,然后,在新的节点上提交任务即可。
注意:
8080是Spark WEBUI界面的端口,7077是Spark任务提交的端口。
修改master的WEBUI端口:
修改start-master.sh即可。
在start-all.sh中,通过start-master.sh启动master:
start-master.sh中:
也可以在Master节点上导入临时环境变量,只是作用于之后的程序,重启就无效了。
[root@node1 sbin]# export SPARK_MASTER_WEBUI_PORT=9999
删除临时环境变量:
[root@node1 sbin]# export -n SPARK_MASTER_WEBUI_PORT=9999
yarn
1).同standalone。
2).在客户端中配置:
2.测试
PI案例:
Standalone提交命令:
YARN提交命令:
如果使用YARN,注意要修改yarn-site.xml中的
yarn.nodemanager.vmem-pmem-ratio,虚拟内存和物理内存比例。
默认值2.1对我们的虚拟机来说太小,因为虚拟机内存1GB。设置为3就可以了。
官网对打包和提交spark应用的说明:
http://spark.apache.org/docs/2.3.1/submitting-applications.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。