赞
踩
课程资源:(林子雨)Spark编程基础(Python版)_哔哩哔哩_bilibili
第三次信息化浪潮:以物联网、云计算、大数据为标志
大数据时代到来的原因:
大数据4V特性:
大数据影响:
大数据关键技术:
大数据计算模式:
Hadoop、Spark、Flink、Beam
不是单一软件,是一个生态系统
Hadoop2.0才出现。资源调度管理框架,实现“一个集群多个框架”
以前为了防止资源打架,会独立部署各个计算框架(如1000台机器指定300台部署MapReduce计算框架,300台部署Spark计算框架,以此类推),但这导致开发成本高、集群资源利用率低、底层数据无法共享和无缝集成,YARN的出现解决了这一问题
不是单一软件,是一个生态系统
Hadoop vs Spark,Hadoop存在以下缺点(主要是其中MapReduce的缺点):
Spark有以下优点:
Q:Spark会取代Hadoop吗?
不是单一软件,是一个生态系统(批处理、查询分析、流计算、图计算、机器学习算法库也都有)。Flink是和Spark同一类型的计算框架
本质区别:Spark是基于RDD的批处理模型,Flink是基于一行行的流处理模型(实时性好于Spark Streaming)
Google提出的统一编程接口 Beam SDK,自动翻译成其他引擎。但目前主流:Hadoop+Spark
背景:MapReduce磁盘读写、IO开销大 —>提出Spark:基于内存的计算框架,构建大型的、低延迟的数据分析应用程序
问题:(1)无法无缝共享,需要进行数据格式的转换;(2)维护成本较高;(3)资源利用不充分,无法做统一的资源管理分配
Spark一个软件栈满足不同应用场景需求,如SQL即席查询、实时流式计算、机器学习、图计算。Spark中各个组件可借助于Yarn进行统一资源调度分配管理
技术软件栈,一站式服务
RDD:弹性分布式数据集。分布式内存的一个抽象概念(整个Spark编程最核心的数据抽象),提供了一种高度受限的共享内存模型
DAG:有向无环图。反映RDD之间的依赖关系,RDD操作会形成DAG
Executor:运行在工作节点(Worker Node,从节点)的一个进程(一个进程会派生出很多线程),负责运行具体的任务/Task
应用/Application:用户编写的Spark应用程序
任务/Task:运行在Executor进程上的工作单元(任务控制节点Driver Program)
作业/Job:一个Job包含多个RDD及作用于相应RDD上的各种操作。一个Spark应用程序提交后,就是分解成1到多个Job去完成的
阶段/Stage:作业的基本调度单位。每个Job会被分解成多组Task,每一组Task的集合叫Stage
Application > Job > Stage > Task
Cluster Manager 集群资源管理器:
- 集群资源:CPU、内存、带宽
- 管理器:自带、YARN、Mesos
执行应用时,Driver会向集群资源管理器申请资源,并启动Executor进程,向进程发送应用程序的代码和文件,应用程序会在进程上派发出线程去执行任务,执行结束后将结果返回给Driver,提交给用户/HDFS/关系型数据库等
分布式系统的两种架构:对等架构P2P;一主多从架构(Spark、Hadoop都是这种)
MapReduce不适合处理迭代场景(如逻辑斯蒂回归、模拟退火算法、遗传算法),中间结果反复读写磁盘,磁盘IO开销太大(反复读写工作子集+序列化和反序列化开销)
RDD为了避免这些问题而出现,提供了抽象的数据结构。把具体应用逻辑表达为RDD转换,不同RDD转换之间的依赖关系即DAG图,优化实现数据的管道化(流水线化)处理,即一个操作结束后数据不需要落磁盘,马上输入给下一个操作,避免数据落地
高度受限的共享内存模型会不会影响表达能力?由于RDD提供的转换操作(map、filter、groupBy、join)十分丰富,可以将其组合实现很多功能。实践证明,Spark能力非常强大,虽是高度受限的共享内存模型,但不会影响表达能力。Spark提供了RDD的API,可以通过调用API实现对RDD的各种操作
惰性调用机制:对RDD的转换操作不会真正执行转换,只会记录转换轨迹,并不会真正发生计算。只有遇到第一个动作类型操作才会触发计算,执行从头到尾操作(从磁盘读取数据到输出)
管道化/流水线优化:转换过程中数据不用落地磁盘,直接把一个操作的输出,给另一个操作作为输入,避免了不必要的读写磁盘开销,也无需保存中间结果
MapReuce写代码时,若应用复杂则需要写入非常复杂的代码;但Spark每个操作都很简单,串联起来的操作集合可以完成非常复杂的功能
一个RDD应用会分成多个作业,一个作业会被分成很多阶段
宽依赖:划分成多个阶段,包含shuffle操作(数据的重分区和混洗),即数据在网络上传输(任务的执行涉及磁盘和网络传输,性能较低)
窄依赖:不划分阶段,没有包含shuffle操作(计算可以在同一分区内完成),即数据不需要在网络上传输,性能较高
是否包含shuffle操作是划分宽窄依赖的依据(Spark根据每个转换操作来确定依赖的类型,如果一个操作不会触发shuffle,那么产生的子RDD和父RDD之间就是窄依赖;如果一个操作需要shuffle,那么产生的子RDD与父RDD之间就是宽依赖)
文件保存在HDFS中,进行多任务执行、分区处理。只要发生了shuffle操作,一定发生了来回交互的数据的分发。shuffle操作在网络中大规模地来回传输数据,不同节点之间互相传数据
宽依赖需要拆分成两阶段,窄依赖不用
宽窄依赖与转换类型有关。宽依赖通常与需要对多个分区进行合并或重排的转换操作相关,这些转换操作会导致Shuffle,产生宽依赖;而窄依赖通常与单个分区的转换操作相关,不需要Shuffle,产生窄依赖;
动作操作则触发实际的计算,通常不涉及依赖关系
从一个RDD到另一个RDD的转换都是一个fork+一个join(fork即并行执行分区转换,结果汇总是join)
DAG有向无环图反向解析:遇到窄依赖就不断添加,形成管道化流水线处理;遇到宽依赖就断开,生成新的阶段Stage,因为要发生等待、洗牌(宽依赖生成不同阶段,窄依赖不断加入阶段)
支持单机部署和集群部署。集群部署有以下三种:
Hadoop包含存储框架HDFS、HBase、计算框架MapReduce等。Spark和Hadoop并不对等,而是可能取代MapReduce。Hadoop的HDFS、HBase会继续发挥存储功能,存储数据拿给计算框架Spark来计算分析,它们共同来满足企业的相关应用场景需求
Spark是一种分布式计算框架,存储需要Hadoop,运行在Linux系统上。Spark底层最终编译成Java字节码运行,故需要Java环境
安装Hadoop教程(包含了安装Java):Hadoop安装教程_单机/伪分布式配置_Hadoop2.6.0(2.7.1)/Ubuntu14.04(16.04)_厦大数据库实验室博客
安装Spark:Apache Spark™ - Unified Engine for large-scale data analytics
关于安装步骤我写过一篇详细的帖子,可以参考:PySpark安装及WordCount实现(基于Ubuntu)_如何原谅奋力过但无声的博客-CSDN博客
- # 解压安装包spark-2.4.0-bin-without-hadoop.tgz至路径/usr/local
- # usr是unix software resource
- sudo tar -zxf ~/Downloads/spark-2.4.0-bin-without-hadoop.tgz -C /usr/local/
- cd /usr/local
- sudo mv ./spark-2.4.0-bin-without-hadoop/ ./spark # 更改文件夹名
- sudo chown -R hadoop ./spark # 此处hadoop为系统用户名,把spark目录权限赋予hadoop用户
-
- # 配置Spark的classpath,这样Spark才能跟Hadoop挂接起来
- cd /usr/local/spark
- cp ./conf/spark-env.sh.template ./conf/spark-env.sh # 拷贝配置文件
-
- vim ./conf/spark-env.sh
- # 编辑该配置文件,在第一行加上如下一行内容
- export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
-
- # 保存配置文件后,就可以用Spark去访问。若需要使用HDFS中的文件,则在使用Spark前需要启动Hadoop
-
- # 启动Spark Shell(Scala语言)
- cd /usr/local/spark
- bin/spark-shell
Spark与Hadoop如何交互?Spark部署模式:
注:HDFS的NameNode(数据目录)和DataNode(具体存储数据),一般为一主多从架构,即一个NameNode,其余全为DataNode
Spark单机版和Hadoop伪分布式可以交互,访问HDFS文件;Spark集群部署模式也可以和Hadoop集群部署模式相互访问
PySpark是一个交互式的执行环境(Spark Shell也是一个交互式的执行环境,但它是Scala语言)
开机启动进入Linux环境,Shell中输入命令进入PySpark环境:
- pyspark --master <master-url> # url不同,分别进入四种不同的环境
-
- # 直接pyspark回车,为local[*]模式
Spark的运行模式取决于传递给 SparkContext 的 master-url 的值。Master URL可以是以下任一种形式:
在Spark中采用本地模式启动PySpark的命令主要包含以下参数:
- cd /usr/local/spark
- ./bin/pyspark --master local[4] # 采用本地模式,在4个CPU核心上运行pyspark
- ./bin/pyspark --master local[4] --jars code.jar # 可以在classpath中添加code.jar
执行 pyspark --help 命令,获取完整的选项列表:
- cd /usr/local/spark
- ./bin/pyspark --help
执行如下命令启动pyspark(默认是local模式):
./bin/pyspark
启动pyspark成功后在输出信息的末尾可以看到 >>> 的命令提示符。使用命令 exit() 退出pyspark
- # WordCount.py 统计文本文件中包含a的行的个数和b的行的个数
- from pyspark import SparkConf, SparkContext
-
- conf = SparkConf().setMaster("local").setAppName("My App") # 生成配置的上下文信息
- # MasterURL取值为local模式;通过网页查看管理时可以看到应用名称为My App
-
- sc = SparkContext(conf = conf) # 生成SparkContext对象
-
- logFile = "file:///usr/local/spark/README.md" # 若是本地文件,是file:///
-
- logData = sc.textFile(logFile,2).cache() # 把文本文件加载进来生成RDD
- # RDD里包含很多元素,每个元素对应一行文本
-
- numAs = logData.filter(lambda line: 'a' in line).count() # 过滤出所有包含单词a的行
- # lambda为匿名函数
- # 把包含单词a的行全过滤出来,放在一个新的RDD中,再.count()统计
-
- numBs = logData.filter(lambda line: 'b' in line).count() # 过滤出所有包含单词b的行
-
- print('Lines with a:%s, Lines with b:%s' % (numAs,numBs))
对于这段Python代码,可以直接使用如下命令执行:
- cd /usr/local/spark/mycode/python
- python3 WordCount.py
- spark-submit
- --master <master-url>
- --deploy-mode <deploy-mode> # 部署模式
- ... # 其它参数
- <application-file> # Python代码文件
- [application-arguments] # 传递给主类的主方法的参数
执行 spark-submit --help 命令,获取完整的选项列表:
- cd /usr/local/spark
- ./bin/spark-submit --help
上述代码以这种方式运行:
- # 通过spark-submit提交到Spark中运行
- /usr/local/spark/bin/spark-submit /usr/local/spark/mycode/python/WordCount.py
-
- # 在命令中间使用“\”符号,把一行完整命令人为断开成多行进行输入
- /usr/local/spark/bin/spark-submit \
- /usr/local/spark/mycode/python/WordCount.py
为了避免其他多余信息对运行结果干扰,可以修改log4j的日志信息显示级别,从 log4j.rootCategory = INFO, console 改成 log4j.rootCategory = ERROR, console
假设有3台机器搭建集群:Master、Slave01、Slave02,且在搭建Spark集群之前,Hadoop集群的构建已经完成(Hadoop 2.7分布式集群环境搭建_厦大数据库实验室博客)
一台机器上,既部署了Hadoop的DataNode,又部署了Spark的Worker Node,即HDFS里的DataNode和Spark的Worker Node共存。这样Spark的Worker Node可以对Hadoop的DataNode数据进行本地化计算
主节点为master,在master节点上安装Spark,和单机时步骤一样:
- # 解压安装包spark-2.4.0-bin-without-hadoop.tgz至路径/usr/local
- # usr是unix software resource
- sudo tar -zxf ~/Downloads/spark-2.4.0-bin-without-hadoop.tgz -C /usr/local/
- cd /usr/local
- sudo mv ./spark-2.4.0-bin-without-hadoop/ ./spark # 更改文件夹名
- sudo chown -R hadoop ./spark # 此处hadoop为系统用户名,把spark目录权限赋予hadoop用户
再执行以下命令:
- # 在master节点主机的终端中执行
- vim ~/.bashrc # 隐藏文件
-
- # 在.bashrc添加如下配置
- export SPARK_HOME = /usr/local/spark
- export PATH = $PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
-
- # 运行source命令,使配置生效
- source ~/.bashrc
接下来配置从节点slaves文件:
- # 将slaves.template拷贝到slaves
- cd /usr/local/spark
- cp ./conf/slaves.template ./conf/slaves
-
- # 编辑./conf/slaves,设置WorkerNode,把默认内容localhost替换成如下内容,一行一个
- # 主机名称,从节点位于这两个主机上
- slave01
- slave02
-
- # 配置spark-env.sh文件
- cp ./conf/spark-env.sh.template ./conf/spark-env.sh # 将spark-env.sh.template拷贝到spark-env.sh
-
- # 编辑spark-env.sh,添加如下内容
- export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath) # 完成Spark和Hadoop的挂接
- export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop # 说明Hadoop相关配置信息的目录
- export SPARK_MASTER_IP=192.168.1.104 # 设置Spark管家节点的IP地址
-
- # 将master主机上的/usr/local/spark文件夹复制到各个节点上
- cd /usr/local/
- tar -zcf ~/spark.master.tar.gz ./spark # 把主节点spark安装目录打包成文件
- cd ~
- scp ./spark.master.tar.gz slave01:/home/hadoop # 把压缩包拷贝到两个从节点上
- scp ./spark.master.tar.gz slave02:/home/hadoop
-
- # 在从节点目录下执行解压缩操作
- sudo rm -rf /usr/local/spark/
- sudo tar -zxf ~/spark.master.tar.gz -C /usr/local
- sudo chown -R hadoop /usr/local/spark # 赋权限
开启:在master节点主机上运行
- # 首先启动Hadoop集群
- # Spark集群与Hadoop集群是搭配使用,Hadoop存数据,Spark计算
- cd /usr/local/hadoop/
- sbin/start-all.sh
-
- # 启动master节点,在master节点主机上运行如下命令
- cd /usr/local/spark/
- sbin/start-master.sh
-
- # 启动所有Slave节点,在master节点主机上运行(启动从节点是在主节点上启动)
- sbin/start-slaves.sh
-
- # 在master主机上打开浏览器,访问http://master:8080
关闭Spark集群:
- # 关闭master节点(master节点上运行)
- sbin/stop-master.sh
-
- # 关闭Worker节点(master节点上运行)
- sbin/stop-slaves.sh
-
- # 关闭Hadoop集群
- cd /usr/local/hadoop/
- sbin/stop-all.sh
以下命令均在master节点上运行:
启动Hadoop集群:
- cd /usr/local/hadoop/
- sbin/start-all.sh
启动Spark的master节点和所有slaves节点:
- cd /usr/local/spark/ # 进入spark安装目录
- sbin/start-master.sh # 启动主节点
- sbin/start-slaves.sh # 启动从节点
(1)在集群中运行应用程序JAR包:需要把spark://host:port作为主节点参数传递给spark-submit
用一个程序提交给集群去算Π的值:
- cd /usr/local/spark/
- bin/spark-submit \
- --master spark://master:7077 \ # 连接到Standalone独立集群模式,使用自带的集群资源管理器
- /usr/local/spark/examples/src/main/python/pi.py 2>&1 | grep "Pi is roughly" # 过滤出有用信息
- # 结果为Pi is roughly 3.1415926
(2)在集群中运行Pyspark:
- cd /usr/local/spark/
- bin/pyspark --master spark://master:7077 # Standalone模式连接到Spark集群
-
- # 进入交互式运行环境
- textFile = sc.textFile("hdfs://master:9000/README.md") # 把底层文件(分布式文件系统hdfs里的文件)加载进来生成RDD
- # hdfs是hdfs://,本地文件是file:///
- textFile.count() # 统计有多少行
- textFile.first() # 取出第一行内容
运行后查看集群信息(用户在独立集群管理Web界面查看应用的运行情况):http://master:8080/
(1)spark-submit:确保Hadoop集群已经启动
- cd /usr/local/spark/
- bin/spark-submit \
- --master yarn-client \ # yarn-client用来调试
- /usr/local/spark/examples/src/main/python/pi.py
运行后,根据在shell中得到的输出结果地址查看(tracking URL)。复制结果地址到浏览器,点击查看Logs,再点击stdout,即可查看结果
(2)在集群中运行pyspark:用pyspark连接到采用Yarn作为集群管理器的集群上(交互式)
- bin/pyspark --master yarn # 默认yarn-client模式
-
- # 假设HDFS根目录下已存在一个文件README.md,在pyspark环境中执行相关语句
- textFile = sc.textFile("hdfs://master:9000/README.md")
- textFile.count() # 统计RDD有多少元素
- textFile.first() # 取出第一行内容
在Hadoop Yarn集群管理Web界面(http://master:8088/cluster)查看所有应用的运行情况
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。