当前位置:   article > 正文

spark小结_spark 下载 node 日志

spark 下载 node 日志

Spark基本组成

image-20210526105334490

一 Spark几种常见运行模式

1.1 local模式 – 所有程序都运行在一个JVM中,主要用于开发时测试

image-20210526111045428

此模式下,
1. 这个SparkSubmit进程又当爹、又当妈,既是客户提交任务的Client进程、又是Spark的driver程序、还充当着Spark执行Task的Executor角色
2.程序的运行状态可通过 http://:4040 查看,但是这是临时的,程序运行完后,这个UI也就失效了。我们可以启动Spark History Server,这
样就可以看到历史运行程序的信息了。

1.2 Standalone-独立集群

image-20210526113836100

需要先启动Spark的Master和Worker守护进程,提交一个任务的命令如下:

./bin/spark-submit --class org.apache.spark.example.SparkPi --master spark://node01:7077 ./examples/jars/spark-examples_2.11-2.1.1.jar 100

此模式下,1. 会在所有有Worker进程的节点上启动Executor来执行应用程序。
\2. Master进程做为cluster manager,用来对应用程序申请的资源进行管理;
\3. SparkSubmit 做为Client端和运行driver程序;
\4. 运行结果在Shell里可见
注意,Worker进程生成几个Executor,每个Executor使用几个core,这些都可以在spark-env.sh里面配置

需要配置文件:

1.slaves 文件

2.spark-env.sh

## 设置JAVA安装目录
JAVA_HOME=/export/server/jdk

## HADOOP软件配置文件目录,读取HDFS上文件和运行Spark在YARN集群时需要,先提前配上
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop/etc/hadoop

## 指定spark老大Master的IP和提交任务的通信端口
SPARK_MASTER_HOST=node1
SPARK_MASTER_PORT=7077

SPARK_MASTER_WEBUI_PORT=8080

SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=1g
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

总结:

spark: 4040 任务运行web-ui界面端口

spark: 8080 spark集群web-ui界面端口

spark: 7077 spark提交任务时的通信端口

hadoop: 50070集群web-ui界面端口

hadoop:8020/9000(老版本) 文件上传下载通信端口

1.3 Spark-On-Yarn

原理:

1、SparkOnYarn的本质是把spark任务的class字节码文件打成jar包,上传到Yarn集群的JVM中去运行

2、Spark集群的相关角色(JVM进程)也会在Yarn的JVM中运行

3.SparkOnYarn需要:

-修改一些配置,让支持SparkOnYarn

-Spark程序打包成jar包

-Spark任务提交工具:bin/spark-submit

-Spark本身依赖的jars:在spark的安装目录的jars中有,提交任务的时候会被上传到Yarn/HDFS,或手动提前上传

4.SparkOnYarn 不需要Spark集群,只需要一个单机版spark解压包即可

5.SparkOnYarn根据Driver运行在哪里分为两种模式:client模式和cluster模式

1.3.1配置Yarn

vim /export/server/hadoop/etc/hadoop/yarn-site.xml

<configuration>
    <!-- 配置yarn主节点的位置 -->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>node1</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <!-- 设置yarn集群的内存分配方案 -->
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>20480</value>
    </property>
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>2048</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-pmem-ratio</name>
        <value>2.1</value>
    </property>
    <!-- 开启日志聚合功能 -->
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>
    <!-- 设置聚合日志在hdfs上的保存时间 -->
    <property>
        <name>yarn.log-aggregation.retain-seconds</name>
        <value>604800</value>
    </property>
    <!-- 设置yarn历史服务器地址 -->
    <property>
        <name>yarn.log.server.url</name>
        <value>http://node1:19888/jobhistory/logs</value>
    </property>
    <!-- 关闭yarn内存检查 -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
</configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

1.3.2配置Spark的历史服务器与Yarn整合

  • 修改spark-defaults.conf

进入配置目录

cd /export/server/spark/conf

修改配置文件名称

mv spark-defaults.conf.template spark-defaults.conf

vim spark-defaults.conf

添加内容:

spark.eventLog.enabled                  true
spark.eventLog.dir                      hdfs://node1:8020/sparklog/
spark.eventLog.compress                 true
spark.yarn.historyServer.address        node1:18080
  • 1
  • 2
  • 3
  • 4
  • 修改spark-env.sh

修改配置文件

vim /export/server/spark/conf/spark-env.sh

增加如下内容:

## 配置spark历史日志存储地址
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"
  • 1
  • 2

注意:sparklog需要手动创建

hadoop fs -mkdir -p /sparklog

  • 修改日志级别

进入目录

cd /export/server/spark/conf

修改日志属性配置文件名称

mv log4j.properties.template log4j.properties

改变日志级别

vim log4j.properties

修改内容如下:

image-20210526142506862

1.3.4 配置依赖的Spark的jar包

1.在HDFS上创建存储spark相关jar包的目录

hadoop fs -mkdir -p /spark/jars/

2.上传$SPARK_HOME/jars所有jar包到HDFS

hadoop fs -put /export/server/spark/jars/* /spark/jars/

3.在node1上修改spark-defaults.conf

vim /export/server/spark/conf/spark-defaults.conf

添加内容

spark.yarn.jars hdfs://node1:8020/spark/jars/*

1.3.5 启动服务

start-all.sh

-启动MRHistoryServer服务,在node1执行命令

mr-jobhistory-daemon.sh start historyserver

- 启动Spark HistoryServer服务,,在node1执行命令

/export/server/spark/sbin/start-history-server.sh

- MRHistoryServer服务WEB UI页面:

http://node1:19888

- Spark HistoryServer服务WEB UI页面:

http://node1:18080/

1.3.6 两种模式

image-20210526144049200

cluster模式-开发使用

image-20210526144543005

操作

1、需要Yarn集群

2、历史服务器

3、提交任务的客户端工具-spark-submit命令

4、待提交的spark任务/程序的字节码–可以使用示例程序

启动服务

- 启动HDFS和YARN服务,在node1执行命令

start-dfs.sh

start-yarn.sh

start-all.sh

-启动MRHistoryServer服务,在node1执行命令0

mr-jobhistory-daemon.sh start historyserver

- 启动Spark HistoryServer服务,,在node1执行命令

/export/server/spark/sbin/start-history-server.sh

- MRHistoryServer服务WEB UI页面:

http://node1:19888

- Spark HistoryServer服务WEB UI页面:

http://node1:18080/

client模式

SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn  \
--deploy-mode client \
--driver-memory 512m \
--driver-cores 1 \
--executor-memory 512m \
--num-executors 2 \
--executor-cores 1 \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.1.jar \
10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

cluster模式

SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--num-executors 1 \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples_2.12-3.0.1.jar \
10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

二、RDD

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.

RDD:弹性分布式数据集,是Spark中最基本的数据抽象,用来表示分布式集合,支持分布式操作!

《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-memory Cluster Computing》

2.1 五大特性

Internally, each RDD is characterized by five main properties:

  • 分区列表: A list of partitions

  • 计算函数: A function for computing each split

  • 依赖关系: A list of dependencies on other RDDs

  • 分区器: Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

  • 计算位置:Optionally, a list of preferred locations to compute each split on (e.g. block locations for
    an HDFS file)

2.2RDD的创建

image-20210527101202045

2.3 RDD操作

分类

1、Transformation:return new RDD (which create a new dataset from an existing one)

Lazy function need Action activate

2、Action: which return a value to the driver program after running a computation on the dataset

(Count, first, collect,take)

2.4 基本算子

map

faltMap

filter

foreach

saveAsTextFile

2.5 分区操作

每个RDD由多分区组成的,实际开发中如果涉及到资源相关操作建议对每个分区进行操作,即

使用mapPartitions代替map函数

使用foreachPartition代替foreach函数

2.6重分区操作

1、减少/增加分区:repartition

2、减少分区:coalesce

2.7 聚合操作

2.7.1 没ke y的聚合函数API

sum

reduce

fold

aggregate

//aggregate(初始值)(局部聚合, 全局聚合)

底层一般用aggregate实现

2.7.2 有ke y的聚合函数

在spark中有一个object对象PairRDDFunctions, 主要针对RDD的数据类型是key/Value分析处理

比如使用过的函数:reduceByKey、groupByKey

*Bykey函数:将相同key的value进行聚合操作,省去先分组再聚合

有key的聚合函数API如下:

groupByKey + sum/reduce

reduceByKey

foldByKey

aggregateByKey

2.8关联操作

image-20210527105634792

2.9 排序操作

sortBy

sortByKey

top

2.10 RDD的缓存/持久化

​ 引入

​ 缓存解决了热点数据频繁访问的的效率问题

​ 在Spark开发中某些RDD的计算或转换可能会比较费时间。如果这些RDD后续还会被频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次使用到的时候就不用重新计算了,提高了程序运行的效率

image-20210527110528213

2.11RDD的checkpoint

​ 引入

​ RDD的数据可以持久化,但是持久化/缓存可以把数据放在内存中,速度是快速度的,但是也是最不可靠的;也可以把数据放在磁盘上,但是磁盘可能会损坏

​ Checkpoint的产生就是为了更加可靠的数据持久化,在checkpoint的时候一般把数据放在HDFS上,就就天然的借助了HDFS天生的高容错性、高可靠性来实现数据的最大程度的安全,实现了RDD的容错和高可用性

API

sc.setCheckpointDir(HDFS路径) // 设置checkpoint路径,开发中一般设置为HDFS的目录

Add.chectpoint // 对计算复杂且后续会被频繁使用的RDD进行checkpoint

注意:缓存/持久化和Checkpoint检查点的区别

1.存储位置

缓存/持久化数据存默认存在内存, 一般设置为内存+磁盘(普通磁盘)

Checkpoint检查点:一般存储在HDFS

2.功能

缓存/持久化:保证数据后续使用的效率高

Checkpoint检查点:保证数据安全/也能一定程度上提高效率

3.对于依赖关系:

缓存/持久化:保留了RDD间的依赖关系

Checkpoint检查点:不保留RDD间的依赖关系

4.开发中如何使用?

对于计算复杂且后续会被频繁使用的RDD先进行缓存/持久化,再进行Checkpoint

三、Spark内核原理

3.1 依赖关系(宽窄依赖)

  • 宽依赖:有shuffle
  • 父RDD的一个分区会被子RDD的多个分区所依赖–正确

image-20210527113601807

  • 窄依赖:没有shuffle
  • 父RDD的一个分区只会被子RDD的1个分区所依赖–正确

image-20210527113759375

image-20210527114514522

3.2 DAG和Stage

DAG

Spark的DAG:就是spark任务/程序执行的流程图!

DAG的开始:从创建RDD开始

DAG的结束:到Action结束

一个Spark程序中有几个Action操作就有几个DAG!

Stage

image-20210527115006391

Stage:是DAG中根据shuffle划分出来的阶段!

前面的阶段执行完才可以执行后面的阶段!

同一个阶段中的各个任务可以并行执行无需等待!

image-20210527115135592

1.Application:应用,就是程序员编写的Spark代码,如WordCount代码

2.Driver:驱动程序,就是用来执行main方法的JVM进程,里面会执行一些Drive端的代码,如创建SparkContext,设置应用名,设置日志级别...

3.SparkContext:Spark运行时的上下文环境,用来和ClusterManager进行通信的,并进行资源的申请、任务的分配和监控等

4.ClusterManager:集群管理器,对于Standalone模式,就是Master,对于Yarn模式就是ResourceManager/ApplicationMaster,在集群上做统一的资源管理的进程

5.Worker:工作节点,是拥有CPU/内存等资源的机器,是真正干活的节点

6.Executor:运行在Worker中的JVM进程!

7.RDD:弹性分布式数据集

8.DAG:有向无环图,就是根据Action形成的RDD的执行流程图---静态的图

9.Job:作业,按照DAG进行执行就形成了Job---按照图动态的执行

10.Stage:DAG中,根据shuffle依赖划分出来的一个个的执行阶段!

11.Task:一个分区上的一系列操作(pipline上的一系列流水线操作)就是一个Task,同一个Stage中的多个Task可以并行执行!(一个Task由一个线程执行),所以也可以这样说:Task(线程)是运行在Executor(进程)中的最小单位!

12.TaskSet:任务集,就是同一个Stage中的各个Task组成的集合!
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

Job提交执行流程

image-20210527115646549

四、SparkStreaming

数据处理流程

Kafka—>SparkStreaming–>各种存储组件

image-20210527120123641

核心计算思想

image-20210527141940253

DStream的本质

image-20210527142511525

image-20210527143031956

DSteam的容错

image-20210527143122619

DSteam的API

Scala版本对应

Spark scala

3.x 2.12.x

2.x. 2.11.x

1.x. 2.10.x

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/888123
推荐阅读
相关标签
  

闽ICP备14008679号