赞
踩
Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发的通用内存并行计算框架
Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。
RDD:抽象弹性分布式数据集( Resiliennt Distributed Datasets )
Spark Core:实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义。
Spark SQL:是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的HQL来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等。
Spark Streaming:是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应。
Spark MLlib:提供常见的机器学习功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能。
Spark GraphX:主要用于图形并行计算和图挖掘系统的组件。
集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器。
运行速度快:
与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中
易用性好:
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的Shell,可以非常方便地在这些Shell中使用Spark集群来验证解决问题的方法
通用性强:
Spark提供了统一的解决方案。Spark可以用于,交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。减少了开发和维护的人力成本和部署平台的物力成本
高兼容性:
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力
部署Spark集群大体上分为两种模式:单机模式与集群模式
大多数分布式框架都支持单机模式,方便开发者调试框架的运行环境。但是在生产环境中,并不会使用单机模式。因此,后续直接按照集群模式部署Spark集群。
下面详细列举了Spark目前支持的部署模式。
在本地部署单个Spark服务,比较适合简单了解spark目录结构,熟悉配置文件,简单跑一下demo示例等调试场景。
Spark自带的任务调度模式,多个spark机器之间内部协作调度,但仅是spark自身的任务调度
Spark使用Hadoop的YARN组件进行资源与任务调度,真正意义上spark与外部对接协作。
Spark使用Mesos平台进行资源与任务的调度。Spark客户端直接连接Mesos;不需要额外构建Spark集群。
( Mesos是一个集群管理平台。 可以理解为是一种分布式系统的kernel, 负责集群资源的分配, 这里的资源指的是CPU资源, 内存资源, 存储资源, 网络资源等。 在Mesos可以运行Spark, Storm, Hadoop, Marathon等多种Framework )
下载安装包:官网 -> Download -> release archives -> spark-2.1.1 -> spark-2.1.1-bin-hadoop2.7.tgz 下载
解压Spark安装包
wangting@ops01:/opt/software >tar -xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
目录改名
wangting@ops01:/opt/software >cd /opt/module/
wangting@ops01:/opt/module >mv spark-2.1.1-bin-hadoop2.7 spark-local
目录结构
wangting@ops01:/opt/module >cd spark-local/ wangting@ops01:/opt/module/spark-local > wangting@ops01:/opt/module/spark-local >ll total 104 drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 bin drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 conf drwxr-xr-x 5 wangting wangting 4096 Apr 26 2017 data drwxr-xr-x 4 wangting wangting 4096 Apr 26 2017 examples drwxr-xr-x 2 wangting wangting 12288 Apr 26 2017 jars -rw-r--r-- 1 wangting wangting 17811 Apr 26 2017 LICENSE drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 licenses -rw-r--r-- 1 wangting wangting 24645 Apr 26 2017 NOTICE drwxr-xr-x 8 wangting wangting 4096 Apr 26 2017 python drwxr-xr-x 3 wangting wangting 4096 Apr 26 2017 R -rw-r--r-- 1 wangting wangting 3817 Apr 26 2017 README.md -rw-r--r-- 1 wangting wangting 128 Apr 26 2017 RELEASE drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 sbin drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 yarn
官方demo示例求圆周率Pi
wangting@ops01:/opt/module/spark-local >bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[4] /opt/module/spark-local/examples/jars/spark-examples_2.11-2.1.1.jar 20 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 21/07/22 10:52:01 INFO SparkContext: Running Spark version 2.1.1 21/07/22 10:52:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 21/07/22 10:52:02 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cee9f744-b8dd-4c75-83be-3884f3b4425b 21/07/22 10:52:02 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 21/07/22 10:52:02 INFO SparkEnv: Registering OutputCommitCoordinator 21/07/22 10:52:02 INFO Utils: Successfully started service 'SparkUI' on port 4040. 21/07/22 10:52:02 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://11.8.37.50:4040 21/07/22 10:52:02 INFO SparkContext: Added JAR file:/opt/module/spark-local/examples/jars/spark-examples_2.11-2.1.1.jar at spark://11.8.37.50:46388/jars/spark-examples_2.11-2.1.1.jar with timestamp 1626922322910 21/07/22 10:52:02 INFO Executor: Starting executor ID driver on host localhost 21/07/22 10:52:04 INFO Executor: Fetching spark://11.8.37.50:46388/jars/spark-examples_2.11-2.1.1.jar with timestamp 1626922322910 21/07/22 10:52:04 INFO TransportClientFactory: Successfully created connection to /11.8.37.50:46388 after 28 ms (0 ms spent in bootstraps) 21/07/22 10:52:04 INFO Utils: Fetching spark://11.8.37.50:46388/jars/spark-examples_2.11-2.1.1.jar to /tmp/spark-86795505-2fa5-4e6e-9331-f26233e462b2/userFiles-174c50ee-3aff-4523-a89b-17910dcd467e/fetchFileTemp632487868763480019.tmp 21/07/22 10:52:04 INFO Executor: Adding file:/tmp/spark-86795505-2fa5-4e6e-9331-f26233e462b2/userFiles-174c50ee-3aff-4523-a89b-17910dcd467e/spark-examples_2.11-2.1.1.jar to class loader 21/07/22 10:52:05 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.386012 s Pi is roughly 3.140143570071785 # <<<<<<< 结果输出 21/07/22 10:52:05 INFO SparkUI: Stopped Spark web UI at http://11.8.37.50:4040 21/07/22 10:52:05 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 21/07/22 10:52:05 INFO MemoryStore: MemoryStore cleared 21/07/22 10:52:05 INFO BlockManager: BlockManager stopped 21/07/22 10:52:05 INFO BlockManagerMaster: BlockManagerMaster stopped 21/07/22 10:52:05 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 21/07/22 10:52:05 INFO SparkContext: Successfully stopped SparkContext 21/07/22 10:52:05 INFO ShutdownHookManager: Shutdown hook called 21/07/22 10:52:05 INFO ShutdownHookManager: Deleting directory /tmp/spark-86795505-2fa5-4e6e-9331-f26233e462b2
bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[4] /opt/module/spark-local/examples/jars/spark-examples_2.11-2.1.1.jar 20
–class:表示要执行程序jar包的主类
–master local[2]
(1)local: 没有指定线程数,则所有计算都运行在一个线程当中,没有任何并行计算
(2)local[K]:指定使用K个Core来运行计算,比如local[4]就是运行4个Core来执行
(3)local[*]: 自动按照CPU最多核来设置线程数。比如CPU有4核,Spark帮你自动设置4个线程计算
spark-examples_2.11-2.1.1.jar:要运行的程序jar包
20 :要运行程序的输入参数 ( 计算圆周率π的次数,计算次数越多,准确率越高 , 这里只是应用示例定义传参)
官方wordcount示例
wordcount将实现多个文件中,若干单词总计次数,统计词频
创建实验目录及文件
wangting@ops01:/opt/module/spark-local >mkdir input wangting@ops01:/opt/module/spark-local >cd input/ wangting@ops01:/opt/module/spark-local/input >echo "hello spark" >> 1.txt wangting@ops01:/opt/module/spark-local/input >echo "hello scala" >> 1.txt wangting@ops01:/opt/module/spark-local/input >echo "hello flower" >> 1.txt wangting@ops01:/opt/module/spark-local/input >echo "hello wangt" >> 1.txt wangting@ops01:/opt/module/spark-local/input >echo "hello hello" >> 2.txt wangting@ops01:/opt/module/spark-local/input >echo "hello niubi" >> 2.txt wangting@ops01:/opt/module/spark-local/input >echo "wang wang" >> 2.txt wangting@ops01:/opt/module/spark-local/input >echo "wangt ting" >> 2.txt wangting@ops01:/opt/module/spark-local/input >cat 1.txt hello spark hello scala hello flower hello wangt wangting@ops01:/opt/module/spark-local/input >cat 2.txt hello hello hello niubi wang wang wangt ting
进入spark-shell命令行
wangting@ops01:/opt/module/spark-local/input >cd .. wangting@ops01:/opt/module/spark-local >bin/spark-shell Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 21/07/22 11:01:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 21/07/22 11:01:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 21/07/22 11:01:09 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 21/07/22 11:01:10 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Spark context Web UI available at http://11.8.37.50:4040 Spark context available as 'sc' (master = local[*], app id = local-1626922864098). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.1 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131) Type in expressions to have them evaluated. Type :help for more information. scala>
【注意】:
执行wordcount任务
scala> sc.textFile("/opt/module/spark-local/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((ting,1), (scala,1), (hello,7), (flower,1), (spark,1), (niubi,1), (wangt,2), (wang,2))
【说明】:( 命令行操作时tab键可以补全 )
def textFile(path: String,minPartitions: Int): org.apache.spark.rdd.RDD[String]
textFile() -> 读取本地文件input文件夹数据
def flatMap[U](f: String => TraversableOnce[U])(implicit evidence$4: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]
flatMap() -> 压平操作,按照空格分割符将一行数据映射成一个个单词
def map[U](f: String => U)(implicit evidence$3: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]
map() -> 对每一个元素操作,将单词映射为元组
def reduceByKey(func: (Int, Int) => Int): org.apache.spark.rdd.RDD[(String, Int)]
reduceByKey() -> 按照key将值进行聚合,相加
def collect[U](f: PartialFunction[(String, Int),U](implicit evidence$29: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U] def collect():Array[(String,Int)]
collect() -> 将数据收集到Driver端展示
页面查看
点击对应的job可以看到更详细的信息
Master & Worker
Driver & Executor
【注意】:
Master和Worker是Spark的守护进程,即Spark在特定模式下正常运行所必须的进程。
Driver和Executor是临时程序,当有具体任务提交到Spark集群才会开启的程序
Standalone模式是Spark自带的资源调动引擎,构建一个由Master + Slave构成的Spark集群,Spark运行在集群中。
这个要和Hadoop中的Standalone区别开来。这里的Standalone是指只用Spark来搭建一个集群,不需要借助其他的框架。是相对于Yarn和Mesos来说的
机器规划(3台即可):
ops01 11.8.37.50 master|worker
ops02 11.8.36.63 worker
ops03 11.8.36.76 worker
ops04 11.8.36.86 worker
【注意】:把信息配置在/etc/hosts主机解析文件中
wangting@ops01:/opt/module >cat /etc/hosts
127.0.0.1 ydt-cisp-ops01
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
11.16.0.176 rancher.mydomain.com
11.8.38.123 www.tongtongcf.com
11.8.37.50 ops01
11.8.36.63 ops02
11.8.36.76 ops03
11.8.38.86 ops04
11.8.38.82 jpserver ydt-dmcp-jpserver
wangting@ops01:/opt/module >
解压安装包
wangting@ops01:/opt/software >tar -xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
目录改名
wangting@ops01:/opt/software >mv /opt/module/spark-2.1.1-bin-hadoop2.7 /opt/module/spark-standalone
配置文件目录
wangting@ops01:/opt/software >cd /opt/module/spark-standalone/conf/
wangting@ops01:/opt/module/spark-standalone/conf >
wangting@ops01:/opt/module/spark-standalone/conf >ll
total 32
-rw-r--r-- 1 wangting wangting 987 Apr 26 2017 docker.properties.template
-rw-r--r-- 1 wangting wangting 1105 Apr 26 2017 fairscheduler.xml.template
-rw-r--r-- 1 wangting wangting 2025 Apr 26 2017 log4j.properties.template
-rw-r--r-- 1 wangting wangting 7313 Apr 26 2017 metrics.properties.template
-rw-r--r-- 1 wangting wangting 865 Apr 26 2017 slaves.template
-rw-r--r-- 1 wangting wangting 1292 Apr 26 2017 spark-defaults.conf.template
-rwxr-xr-x 1 wangting wangting 3960 Apr 26 2017 spark-env.sh.template
修改slaves配置定义集群
wangting@ops01:/opt/module/spark-standalone/conf >mv slaves.template slaves
wangting@ops01:/opt/module/spark-standalone/conf >vim slaves
# limitations under the License.
#
# A Spark Worker will be started on each of the machines listed below.
ops01
ops02
ops03
ops04
修改spark-env.sh文件,添加master节点
wangting@ops01:/opt/module/spark-standalone/conf >mv spark-env.sh.template spark-env.sh
wangting@ops01:/opt/module/spark-standalone/conf >vim spark-env.sh
SPARK_MASTER_HOST=ops01
SPARK_MASTER_PORT=7077
分发spark-standalone目录至各节点
wangting@ops01:/opt/module >scp -r spark-standalone ops02:/opt/module/
wangting@ops01:/opt/module >scp -r spark-standalone ops03:/opt/module/
wangting@ops01:/opt/module >scp -r spark-standalone ops04:/opt/module/
检查8080端口和spark进程
# 启动spark-shell会同时开启8080端口-> 前端页面 ; 所以先看是否有被占用
wangting@ops01:/home/wangting >sudo netstat -tnlpu|grep 8080
wangting@ops01:/home/wangting >
wangting@ops01:/home/wangting >jps -l | grep spark
wangting@ops01:/home/wangting >
启动spark-standalone集群
wangting@ops01:/home/wangting >cd /opt/module/spark-standalone/ wangting@ops01:/opt/module/spark-standalone >sbin/start-all.sh starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.master.Master-1-ops01.out ops04: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops04.out ops03: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops03.out ops01: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops01.out ops02: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops02.out ops04: failed to launch: nice -n 0 /opt/module/spark-standalone/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ops01:7077 ops04: JAVA_HOME is not set ops04: full log in /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops04.out wangting@ops01:/opt/module/spark-standalone > wangting@ops01:/opt/module/spark-standalone >sudo netstat -tnlpu|grep 8080 tcp6 0 0 :::8080 :::* LISTEN 57689/java wangting@ops01:/opt/module/spark-standalone >jps -l | grep spark 57809 org.apache.spark.deploy.worker.Worker 57689 org.apache.spark.deploy.master.Master wangting@ops01:/opt/module/spark-standalone >
处理JAVA_HOME is not set
服务虽然成功启动,但是启动集群时,提示在ops04上有 : ops04: JAVA_HOME is not set
切换至ops04服务器
wangting@ops04:/opt/module/spark-standalone >echo $JAVA_HOME
/usr/java8_64/jdk1.8.0_101
wangting@ops04:/opt/module/spark-standalone >vim sbin/spark-config.sh
export JAVA_HOME=/usr/java8_64/jdk1.8.0_101
切换回master: ops01重启服务
wangting@ops01:/opt/module/spark-standalone >sbin/stop-all.sh
ops01: stopping org.apache.spark.deploy.worker.Worker
ops03: stopping org.apache.spark.deploy.worker.Worker
ops04: no org.apache.spark.deploy.worker.Worker to stop
ops02: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
wangting@ops01:/opt/module/spark-standalone >
wangting@ops01:/opt/module/spark-standalone >sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.master.Master-1-ops01.out
ops01: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops01.out
ops03: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops03.out
ops04: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops04.out
ops02: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops02.out
wangting@ops01:/opt/module/spark-standalone >
之前的提示已经处理了
浏览器查看界面
官方demo示例求圆周率Pi
wangting@ops01:/opt/module/spark-standalone >bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://ops01:7077 /opt/module/spark-standalone/examples/jars/spark-examples_2.11-2.1.1.jar 20 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 21/07/23 15:13:39 INFO SparkContext: Running Spark version 2.1.1 21/07/23 15:13:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 21/07/23 15:13:39 INFO SecurityManager: Changing view acls to: wangting 21/07/23 15:13:39 INFO SecurityManager: Changing modify acls to: wangting 21/07/23 15:13:39 INFO SecurityManager: Changing view acls groups to: 21/07/23 15:13:39 INFO SecurityManager: Changing modify acls groups to: 21/07/23 15:13:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 21/07/23 15:13:44 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 2.824153 s Pi is roughly 3.1423635711817854 # <<< 输出结果 21/07/23 15:13:44 INFO SparkUI: Stopped Spark web UI at http://11.8.37.50:4040 21/07/23 15:13:44 INFO StandaloneSchedulerBackend: Shutting down all executors 21/07/23 15:13:44 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down 21/07/23 15:13:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 21/07/23 15:13:44 INFO MemoryStore: MemoryStore cleared 21/07/23 15:13:44 INFO BlockManager: BlockManager stopped 21/07/23 15:13:44 INFO BlockManagerMaster: BlockManagerMaster stopped 21/07/23 15:13:44 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 21/07/23 15:13:44 INFO SparkContext: Successfully stopped SparkContext 21/07/23 15:13:44 INFO ShutdownHookManager: Shutdown hook called 21/07/23 15:13:44 INFO ShutdownHookManager: Deleting directory /tmp/spark-6547bdc7-5117-4c44-8f14-4328fa38ace6
页面查看任务状态
指定资源执行任务
wangting@ops01:/opt/module/spark-standalone >bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://ops01:7077 --executor-memory 8G --total-executor-cores 4 /opt/module/spark-standalone/examples/jars/spark-examples_2.11-2.1.1.jar 20
wangting@ops01:/opt/module/spark-standalone >
页面查看新任务状态资源变化
配置历史服务
【注意】: 默认已经安装有hdfs环境,如果没有需要先搭建部署一下,如果仅仅是为了实验测试可以使用local版即可;如搭建集群则默认已经具备了hdfs集群环境。
由于spark-shell停止掉后,hadoop102:4040页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况
wangting@ops01:/opt/module/spark-standalone >cd conf/ # 修改配置文件 wangting@ops01:/opt/module/spark-standalone/conf >mv spark-defaults.conf.template spark-defaults.conf wangting@ops01:/opt/module/spark-standalone/conf > wangting@ops01:/opt/module/spark-standalone/conf >vim spark-defaults.conf # spark.eventLog.enabled true spark.eventLog.dir hdfs://ops01:8020/directory # 在hdfs上新增/directory目录 wangting@ops01:/opt/module/spark-standalone/conf >hdfs dfs -ls / 2021-07-23 15:24:45,730 INFO [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS Found 15 items drwxr-xr-x - wangting supergroup 0 2021-03-17 11:44 /20210317 drwxr-xr-x - wangting supergroup 0 2021-03-19 10:51 /20210319 drwxr-xr-x - wangting supergroup 0 2021-04-24 17:05 /flume -rw-r--r-- 3 wangting supergroup 338075860 2021-03-12 11:50 /hadoop-3.1.3.tar.gz drwxr-xr-x - wangting supergroup 0 2021-05-13 15:31 /hbase drwxr-xr-x - wangting supergroup 0 2021-05-26 16:56 /origin_data drwxr-xr-x - wangting supergroup 0 2021-06-10 10:31 /spark-history drwxr-xr-x - wangting supergroup 0 2021-06-10 10:39 /spark-jars drwxr-xr-x - wangting supergroup 0 2021-06-10 11:11 /student drwxr-xr-x - wangting supergroup 0 2021-04-04 11:07 /test.db drwxr-xr-x - wangting supergroup 0 2021-03-19 11:14 /testgetmerge drwxr-xr-x - wangting supergroup 0 2021-04-10 16:23 /tez drwx------ - wangting supergroup 0 2021-04-02 15:14 /tmp drwxr-xr-x - wangting supergroup 0 2021-04-02 15:25 /user drwxr-xr-x - wangting supergroup 0 2021-06-10 11:43 /warehouse wangting@ops01:/opt/module/spark-standalone/conf >hdfs dfs -mkdir /directory 2021-07-23 15:25:14,573 INFO [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS wangting@ops01:/opt/module/spark-standalone/conf >hdfs dfs -ls / | grep directory drwxr-xr-x - wangting supergroup 0 2021-07-23 15:25 /directory # 修改spark-env配置 wangting@ops01:/opt/module/spark-standalone/conf >vim spark-env.sh export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://ops01:8020/directory -Dspark.history.retainedApplications=30" wangting@ops01:/opt/module/spark-standalone/conf > # 分发配置文件 wangting@ops01:/opt/module/spark-standalone/conf >scp spark-env.sh spark-defaults.conf ops02:/opt/module/spark-standalone/conf/ wangting@ops01:/opt/module/spark-standalone/conf >scp spark-env.sh spark-defaults.conf ops03:/opt/module/spark-standalone/conf/ wangting@ops01:/opt/module/spark-standalone/conf >scp spark-env.sh spark-defaults.conf ops04:/opt/module/spark-standalone/conf/ # 启动历史服务 wangting@ops01:/opt/module/spark-standalone >sbin/start-history-server.sh starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.history.HistoryServer-1-ops01.out wangting@ops01:/opt/module/spark-standalone > # 再次执行一下任务 wangting@ops01:/opt/module/spark-standalone >bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://ops01:7077 --executor-memory 8G --total-executor-cores 4 /opt/module/spark-standalone/examples/jars/spark-examples_2.11-2.1.1.jar 20
通过ip:18080访问历史任务页面
http://11.8.37.50:18080/
因为这里standalone仅实验使用,暂不考虑高可用相关操作,高可用省略,正式环境一般国内几乎都是使用yarn模式为主,国外有mesos模式
yarn模式 需要提前准备hadoop集群,hdfs以及yarn集群;在之前的文章里有写过部署方式 :《hadoop介绍部署文档》
服务 | ops01(8C32G) | ops02(8C24G) | ops03(8C24G) | ops04(8C24G) | version |
---|---|---|---|---|---|
Hdfs | NameNode | Datanode | SecondaryNameNode | Datanode | 3.1.3 |
Yarn | NodeManager | ReSourceManager / NodeManager | NodeManager | NodeManager | 3.1.3 |
MapReduce | √ JobHistoryServer | √ | √ | √ | 3.1.3 |
停止Standalone模式下的spark集群
wangting@ops01:/opt/module/spark-standalone >sbin/stop-all.sh
ops01: stopping org.apache.spark.deploy.worker.Worker
ops04: stopping org.apache.spark.deploy.worker.Worker
ops03: stopping org.apache.spark.deploy.worker.Worker
ops02: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
wangting@ops01:/opt/module/spark-standalone >
解压spark包并改名
wangting@ops01:/opt/module/spark-standalone >cd /opt/software/
wangting@ops01:/opt/software >tar -xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
wangting@ops01:/opt/software >cd /opt/module/
wangting@ops01:/opt/module >mv spark-2.1.1-bin-hadoop2.7 spark-yarn
wangting@ops01:/opt/module >cd spark-yarn/
修改配置spark-env
最后增加配置项:YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop ( 定义hadoop安装配置路径 )
wangting@ops01:/opt/module/spark-yarn >cd conf/
wangting@ops01:/opt/module/spark-yarn/conf >mv spark-env.sh.template spark-env.sh
wangting@ops01:/opt/module/spark-yarn/conf >vim spark-env.sh
YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
启动HDFS、YARN
确认已启动HDFS以及YARN集群(在hadoop/sbin/目录下启动)
这一步每个环境不一样,取决于自己本地部署在哪里,每个组件的master在哪个服务器上
wangting@ops01:/opt/module/hadoop-3.1.3 >sbin/start-dfs.sh
Starting namenodes on [ops01]
Starting datanodes
Starting secondary namenodes [ops03]
wangting@ops02:/opt/module/hadoop-3.1.3/sbin >./start-yarn.sh
Starting resourcemanager
Starting nodemanagers
官方demo示例求圆周率Pi
wangting@ops01:/opt/module/spark-yarn >bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ /opt/module/spark-yarn/examples/jars/spark-examples_2.11-2.1.1.jar \ 20 2021-07-26 11:41:56,166 INFO spark.SparkContext: Running Spark version 2.1.1 2021-07-26 11:41:56,606 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2021-07-26 11:41:56,784 INFO spark.SecurityManager: Changing view acls to: wangting 2021-07-26 11:41:56,784 INFO spark.SecurityManager: Changing modify acls to: wangting 2021-07-26 11:41:56,785 INFO spark.SecurityManager: Changing view acls groups to: 2021-07-26 11:41:56,786 INFO spark.SecurityManager: Changing modify acls groups to: 2021-07-26 11:41:56,786 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(wangting); groups with view permissions: Set(); users with modify permissions: Set(wangting); groups with modify permissions: Set() 2021-07-26 11:42:21,960 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 0.933 s 2021-07-26 11:42:21,965 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.150791 s Pi is roughly 3.139429569714785 # <<< 输出结果 2021-07-26 11:42:21,976 INFO server.ServerConnector: Stopped Spark@61edc883{HTTP/1.1}{0.0.0.0:4040} 2021-07-26 11:42:21,977 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6063d80a{/stages/stage/kill,null,UNAVAILABLE,@Spark} 2021-07-26 11:42:21,977 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5ae76500{/jobs/job/kill,null,UNAVAILABLE,@Spark} 2021-07-26 11:42:22,010 INFO cluster.YarnClientSchedulerBackend: Stopped 2021-07-26 11:42:22,015 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 2021-07-26 11:42:22,027 INFO memory.MemoryStore: MemoryStore cleared 2021-07-26 11:42:22,027 INFO storage.BlockManager: BlockManager stopped 2021-07-26 11:42:22,033 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 2021-07-26 11:42:22,037 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 2021-07-26 11:42:22,038 INFO spark.SparkContext: Successfully stopped SparkContext 2021-07-26 11:42:22,040 INFO util.ShutdownHookManager: Shutdown hook called 2021-07-26 11:42:22,041 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-d3cf3aec-be6f-41f0-a950-4521641e6179
查看集群yarn resourcemanager 8088端口
可以看到历史作业记录
Spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。
yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster适用于生产环境。
wangting@ops01:/opt/module/spark-yarn >bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
在控制台输出可以直接看到输出结果
Client模式任务流程
wangting@ops01:/opt/module/spark-yarn >bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
点进去任务,在日志输出中是可以看到最终的Pi输出
Cluster模式任务流程
模式 | Spark安装机器数 | 需启动的进程 | 所属者 |
---|---|---|---|
Local模式 | 1 | 无 | Spark |
Standalone模式 | 3 | Master及Worker | Spark |
Yarn模式 | 1 | 依赖现有 Yarn及HDFS | Hadoop |
1)Spark历史服务器端口号:18080 (类比于Hadoop历史服务器端口号:19888)
2)Spark Master Web端口号:8080(类比于Hadoop的NameNode Web端口号:9870(50070))
3)Spark Master内部通信服务端口号:7077 (类比于Hadoop的8020 ( 9000 )端口)
4)Spark查看当前Spark-shell运行任务情况端口号:4040
5)Hadoop YARN任务运行情况查看端口号:8088
先安装idea代码管理工具,安装java环境、scala环境、idea配置maven环境等等准备工作,可自行百度。
Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成Jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理Jar包的依赖。
1)创建一个Maven项目
2)准备一些wordcount素材
3)导入项目依赖
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wangting</groupId> <artifactId>spark_wt_test</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies> <build> <finalName>WordCount</finalName> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
【注意】:
1.txt
hello niubi
nihao niubiplus
scala niubi
spark niubi scala spark
2.txt
hello wangting
nihao wang
scala ting
spark wangting scala spark
package com.wangting.spark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { //创建SparkConf配置文件 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount") //创建SparkContext对象 val sc: SparkContext = new SparkContext(conf) //sc.textFile("").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect //读取外部文件 val textRDD: RDD[String] = sc.textFile("E:\\spark_wt_test\\input\\1.txt") //对读取到的内容进行切割并进行扁平化操作 val flatMapRDD: RDD[String] = textRDD.flatMap(_.split(" ")) //对数据集中的内容进行结构的转换 ---计数 val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1)) //对相同的单词 出现的次数进行汇总 val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _) //将执行的结构进行收集 val res: Array[(String, Int)] = reduceRDD.collect() res.foreach(println) /* //创建SparkConf配置文件 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount") //创建SparkContext对象 val sc: SparkContext = new SparkContext(conf) //一行代码搞定 sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile(args(1)) */ //释放资源 sc.stop() } }
xml增加打包package配置
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <archive> <manifest> <mainClass>com.wangting.spark.WordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
注意plugin配置放置位置 build - plugins - plugin
【注意】:
上面只是一个开发流程梳理,每一步实际场景中必然是不一样的,这里只是让熟悉开发的过程经历哪些步骤
准备idea并配置maven环境
创建一个maven项目
项目中可以创建一个input目录,编写一些素材替代去hdfs获取数据来测试代码
在src/main中也创建一个类型同java的目录scala( Sources Root )
注意pom文件中的依赖配置增加,增加完后可以再右上方附近找到maven,点开找到第一个类似更新的按钮,会去把依赖去下载到本地
可以把代码维护管理放置再scala包下
最后可以在maven中的Lifecycle选项,使用package打包
打包完成后,在项目目录下会有一个target目录;包的名字为pom中定义的:WordCount,流程没有问题则会有一个WordCount.jar包
最后把jar包传到服务器上,去运行jar包,到此整个开发流程完结
1.官网下载spark-2.1.1源码zip包,解压到本地
2.在idea代码块中ctrl点方法等关联不到源码时,会提示Download Sources / ChooseSources
3.选择ChooseSources ,在路径选择中选择解压的spark源码目录路径
4.关联对应路径后,点击ok导入即完成
因版本问题,如果本机操作系统是Windows,如果在程序中使用了Hadoop相关的东西,比如写入文件到HDFS,则会遇到如下异常
21/07/29 10:07:41 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300) at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293) at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362) at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$29.apply(SparkContext.scala:1013) at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$29.apply(SparkContext.scala:1013) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:179) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:179) at scala.Option.foreach(Option.scala:257) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:179) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
【注意】:只需要配置有这样一个参数,实际路径是否有hadoop部署无关紧要,保证这个参数有,流程能走过去即可,因为实际代码测试时,调用本地的静态文件
处理报错加了环境参数后:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。