赞
踩
此模式下,
1. 这个SparkSubmit进程又当爹、又当妈,既是客户提交任务的Client进程、又是Spark的driver程序、还充当着Spark执行Task的Executor角色
2.程序的运行状态可通过 http://:4040 查看,但是这是临时的,程序运行完后,这个UI也就失效了。我们可以启动Spark History Server,这
样就可以看到历史运行程序的信息了。
需要先启动Spark的Master和Worker守护进程,提交一个任务的命令如下:
./bin/spark-submit --class org.apache.spark.example.SparkPi --master spark://node01:7077 ./examples/jars/spark-examples_2.11-2.1.1.jar 100
此模式下,1. 会在所有有Worker进程的节点上启动Executor来执行应用程序。
\2. Master进程做为cluster manager,用来对应用程序申请的资源进行管理;
\3. SparkSubmit 做为Client端和运行driver程序;
\4. 运行结果在Shell里可见
注意,Worker进程生成几个Executor,每个Executor使用几个core,这些都可以在spark-env.sh里面配置
需要配置文件:
1.slaves 文件
2.spark-env.sh
## 设置JAVA安装目录
JAVA_HOME=/export/server/jdk
## HADOOP软件配置文件目录,读取HDFS上文件和运行Spark在YARN集群时需要,先提前配上
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop/etc/hadoop
## 指定spark老大Master的IP和提交任务的通信端口
SPARK_MASTER_HOST=node1
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=1g
总结:
spark: 4040 任务运行web-ui界面端口
spark: 8080 spark集群web-ui界面端口
spark: 7077 spark提交任务时的通信端口
hadoop: 50070集群web-ui界面端口
hadoop:8020/9000(老版本) 文件上传下载通信端口
原理:
1、SparkOnYarn的本质是把spark任务的class字节码文件打成jar包,上传到Yarn集群的JVM中去运行
2、Spark集群的相关角色(JVM进程)也会在Yarn的JVM中运行
3.SparkOnYarn需要:
-修改一些配置,让支持SparkOnYarn
-Spark程序打包成jar包
-Spark任务提交工具:bin/spark-submit
-Spark本身依赖的jars:在spark的安装目录的jars中有,提交任务的时候会被上传到Yarn/HDFS,或手动提前上传
4.SparkOnYarn 不需要Spark集群,只需要一个单机版spark解压包即可
5.SparkOnYarn根据Driver运行在哪里分为两种模式:client模式和cluster模式
vim /export/server/hadoop/etc/hadoop/yarn-site.xml
<configuration> <!-- 配置yarn主节点的位置 --> <property> <name>yarn.resourcemanager.hostname</name> <value>node1</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- 设置yarn集群的内存分配方案 --> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>20480</value> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>2.1</value> </property> <!-- 开启日志聚合功能 --> <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> <!-- 设置聚合日志在hdfs上的保存时间 --> <property> <name>yarn.log-aggregation.retain-seconds</name> <value>604800</value> </property> <!-- 设置yarn历史服务器地址 --> <property> <name>yarn.log.server.url</name> <value>http://node1:19888/jobhistory/logs</value> </property> <!-- 关闭yarn内存检查 --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> </configuration>
进入配置目录
cd /export/server/spark/conf
修改配置文件名称
mv spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
添加内容:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:8020/sparklog/
spark.eventLog.compress true
spark.yarn.historyServer.address node1:18080
修改配置文件
vim /export/server/spark/conf/spark-env.sh
增加如下内容:
## 配置spark历史日志存储地址
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"
注意:sparklog需要手动创建
hadoop fs -mkdir -p /sparklog
进入目录
cd /export/server/spark/conf
修改日志属性配置文件名称
mv log4j.properties.template log4j.properties
改变日志级别
vim log4j.properties
修改内容如下:
1.在HDFS上创建存储spark相关jar包的目录
hadoop fs -mkdir -p /spark/jars/
2.上传$SPARK_HOME/jars所有jar包到HDFS
hadoop fs -put /export/server/spark/jars/* /spark/jars/
3.在node1上修改spark-defaults.conf
vim /export/server/spark/conf/spark-defaults.conf
添加内容
spark.yarn.jars hdfs://node1:8020/spark/jars/*
start-all.sh
-启动MRHistoryServer服务,在node1执行命令
mr-jobhistory-daemon.sh start historyserver
- 启动Spark HistoryServer服务,,在node1执行命令
/export/server/spark/sbin/start-history-server.sh
- MRHistoryServer服务WEB UI页面:
http://node1:19888
- Spark HistoryServer服务WEB UI页面:
http://node1:18080/
cluster模式-开发使用
1、需要Yarn集群
2、历史服务器
3、提交任务的客户端工具-spark-submit命令
4、待提交的spark任务/程序的字节码–可以使用示例程序
- 启动HDFS和YARN服务,在node1执行命令
start-dfs.sh
start-yarn.sh
或
start-all.sh
-启动MRHistoryServer服务,在node1执行命令0
mr-jobhistory-daemon.sh start historyserver
- 启动Spark HistoryServer服务,,在node1执行命令
/export/server/spark/sbin/start-history-server.sh
- MRHistoryServer服务WEB UI页面:
http://node1:19888
- Spark HistoryServer服务WEB UI页面:
http://node1:18080/
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--driver-cores 1 \
--executor-memory 512m \
--num-executors 2 \
--executor-cores 1 \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.1.jar \
10
cluster模式
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--num-executors 1 \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.1.jar \
10
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
RDD:弹性分布式数据集,是Spark中最基本的数据抽象,用来表示分布式集合,支持分布式操作!
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-memory Cluster Computing》
Internally, each RDD is characterized by five main properties:
分区列表: A list of partitions
计算函数: A function for computing each split
依赖关系: A list of dependencies on other RDDs
分区器: Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
计算位置:Optionally, a list of preferred locations to compute each split on (e.g. block locations for
an HDFS file)
分类
1、Transformation:return new RDD (which create a new dataset from an existing one)
Lazy function need Action activate
2、Action: which return a value to the driver program after running a computation on the dataset
(Count, first, collect,take)
map
faltMap
filter
foreach
saveAsTextFile
每个RDD由多分区组成的,实际开发中如果涉及到资源相关操作建议对每个分区进行操作,即
使用mapPartitions代替map函数
使用foreachPartition代替foreach函数
1、减少/增加分区:repartition
2、减少分区:coalesce
sum
reduce
fold
aggregate
//aggregate(初始值)(局部聚合, 全局聚合)
底层一般用aggregate实现
在spark中有一个object对象PairRDDFunctions, 主要针对RDD的数据类型是key/Value分析处理
比如使用过的函数:reduceByKey、groupByKey
*Bykey函数:将相同key的value进行聚合操作,省去先分组再聚合
有key的聚合函数API如下:
groupByKey + sum/reduce
reduceByKey
foldByKey
aggregateByKey
sortBy
sortByKey
top
引入
缓存解决了热点数据频繁访问的的效率问题
在Spark开发中某些RDD的计算或转换可能会比较费时间。如果这些RDD后续还会被频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次使用到的时候就不用重新计算了,提高了程序运行的效率
引入
RDD的数据可以持久化,但是持久化/缓存可以把数据放在内存中,速度是快速度的,但是也是最不可靠的;也可以把数据放在磁盘上,但是磁盘可能会损坏
Checkpoint的产生就是为了更加可靠的数据持久化,在checkpoint的时候一般把数据放在HDFS上,就就天然的借助了HDFS天生的高容错性、高可靠性来实现数据的最大程度的安全,实现了RDD的容错和高可用性
API
sc.setCheckpointDir(HDFS路径) // 设置checkpoint路径,开发中一般设置为HDFS的目录
Add.chectpoint // 对计算复杂且后续会被频繁使用的RDD进行checkpoint
1.存储位置
缓存/持久化数据存默认存在内存, 一般设置为内存+磁盘(普通磁盘)
Checkpoint检查点:一般存储在HDFS
2.功能
缓存/持久化:保证数据后续使用的效率高
Checkpoint检查点:保证数据安全/也能一定程度上提高效率
3.对于依赖关系:
缓存/持久化:保留了RDD间的依赖关系
Checkpoint检查点:不保留RDD间的依赖关系
4.开发中如何使用?
对于计算复杂且后续会被频繁使用的RDD先进行缓存/持久化,再进行Checkpoint
Spark的DAG:就是spark任务/程序执行的流程图!
DAG的开始:从创建RDD开始
DAG的结束:到Action结束
一个Spark程序中有几个Action操作就有几个DAG!
Stage:是DAG中根据shuffle划分出来的阶段!
前面的阶段执行完才可以执行后面的阶段!
同一个阶段中的各个任务可以并行执行无需等待!
1.Application:应用,就是程序员编写的Spark代码,如WordCount代码 2.Driver:驱动程序,就是用来执行main方法的JVM进程,里面会执行一些Drive端的代码,如创建SparkContext,设置应用名,设置日志级别... 3.SparkContext:Spark运行时的上下文环境,用来和ClusterManager进行通信的,并进行资源的申请、任务的分配和监控等 4.ClusterManager:集群管理器,对于Standalone模式,就是Master,对于Yarn模式就是ResourceManager/ApplicationMaster,在集群上做统一的资源管理的进程 5.Worker:工作节点,是拥有CPU/内存等资源的机器,是真正干活的节点 6.Executor:运行在Worker中的JVM进程! 7.RDD:弹性分布式数据集 8.DAG:有向无环图,就是根据Action形成的RDD的执行流程图---静态的图 9.Job:作业,按照DAG进行执行就形成了Job---按照图动态的执行 10.Stage:DAG中,根据shuffle依赖划分出来的一个个的执行阶段! 11.Task:一个分区上的一系列操作(pipline上的一系列流水线操作)就是一个Task,同一个Stage中的多个Task可以并行执行!(一个Task由一个线程执行),所以也可以这样说:Task(线程)是运行在Executor(进程)中的最小单位! 12.TaskSet:任务集,就是同一个Stage中的各个Task组成的集合!
数据处理流程
Kafka—>SparkStreaming–>各种存储组件
核心计算思想
DStream的本质
DSteam的容错
DSteam的API
Spark scala
3.x 2.12.x
2.x. 2.11.x
1.x. 2.10.x
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。