赞
踩
Spark和Hadoop的根本差异是多个作业之间的数据通信问题:
Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘的
在绝大多数的数据计算场景中,spark确实会比mapreduce更有优势,但是Spark是基于内存的,所以在实际的生产环境中,由于内存的限制,可能会由于内存资源不够导致Job执行失败,此时,Mapreduce其实是一个更好的选择,所以Spark并不能完全替代MR。
1.Spark Core
spark core中提供了Spark最基础和最核心的功能,spark其他功能如:Spark SQL,spark Streaming,GraphX,Mlib都是在spark core上拓展的
2.Spark SQL
spark sql是spark用来操作结构化数据的组件。通过spark sql,用户可以使用SQL或者hive
3.Spark Streaming
Spark Streaming是spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API
采用了Spark特有方法的写法
package com.zxy.SparkCore import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount2{ def main(args: Array[String]): Unit = { //建立和Spark框架的连接 val wordCount: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount") val context: SparkContext = new SparkContext(wordCount) //读取指定文件目录数据 val lines: RDD[String] = context.textFile("spark-core\\dates") //切分数据 val words: RDD[String] = lines.flatMap(_.split("\\s+")) //数据分组 val WordToOne: RDD[(String, Int)] = words.map( word => (word, 1) ) //spark提供的方法,将分组和聚合通过一个方法实现 //reduceByKey:相同的饿数据,可以对value进行reduce聚合 val WordToCount: RDD[(String, Int)] = WordToOne.reduceByKey(_ + _) //数据收集 val array: Array[(String, Int)] = WordToCount.collect() //数据打印 array.foreach(println) //关闭连接 context.stop() } }
简化版
package com.zxy.SparkCore import org.apache.spark.{SparkConf, SparkContext} object WordCount4{ def main(args: Array[String]): Unit = { //建立和Spark框架的连接 val wordCount: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount") val context: SparkContext = new SparkContext(wordCount) context.textFile("spark-core\\dates").flatMap(_.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _).collect().foreach(println) //关闭连接 context.stop() } }
控制台效果
我这里采用的Scala2.11.8
使用的Spark2.4.7
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.7</version>
</dependency>
</dependencies>
log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n log4j.logger.org.apache.spark.repl.Main=WARN log4j.logger.org.spark_project.jetty=WARN log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR log4j.logger.org.apache.parquet.CorruptStatistics=ERROR log4j.logger.parquet.CorruptStatistics=ERROR
Hadoop版本采用2.8.1,Spark版本采用3.0.2
## 解压缩 [root@hadoop software]# tar -zxvf spark-3.0.2-bin-hadoop3.2.tgz -C /opt/ Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.2 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45) Type in expressions to have them evaluated. Type :help for more information. ## Web UI http://192.168.130.129:4040/jobs/
快速入门
scala> sc.textFile("data/date.txt").flatMap(_.split("\\s+")).groupBy(word => word).map(vk => (vk._1,vk._2.size)).collect().foreach(println)
(Spark,1)
(Hello,2)
(Scala,1)
scala> sc.textFile("data/date.txt").flatMap(_.split("\\s+")).map(word => (word,1)).reduceByKey(_ + _).collect().foreach(println)
(Spark,1)
(Hello,2)
(Scala,1)
## Maven项目打包上传到Spark,
bin/spark-submit \
--class com.zxy.SparkCore.WordCount4 \
--master local[2] \
/opt/apps/spark-3.0.2/data/spark-core-1.0-SNAPSHOT.jar \
10
Driver相当于Client,Executor相当于Server
package com.zxy.Socket import java.io.OutputStream import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client: Socket = new Socket("localhost",9999) //发送数据 val out: OutputStream = client.getOutputStream out.write(2) out.flush() out.close() client.close() } }
package com.zxy.Socket import java.io.InputStream import java.net.{ServerSocket, Socket} object Executor { def main(args: Array[String]): Unit = { //启动服务器,接受数据 val server: ServerSocket = new ServerSocket(9999) println("服务器启动,等待数据") //等待客户端连接接收数据 val client: Socket = server.accept() val in: InputStream = client.getInputStream val i: Int = in.read() println(s"接收到客户端数据 + ${i}") client.close() server.close() } }
先启动服务端Executor,等待数据
启动客户端Driver,建立连接发送数据
修改以上案例,使用两个服务端Executor接收数据,将Task中的数据分开计算
package com.zxy.Socket import java.io.{InputStream, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor1 { def main(args: Array[String]): Unit = { //启动服务器,接受数据 val server: ServerSocket = new ServerSocket(8888) println("服务器启动,等待数据") //等待客户端连接接收数据 val client: Socket = server.accept() val in: InputStream = client.getInputStream val TaskOBJ2: ObjectInputStream = new ObjectInputStream(in) val task: SubTask = TaskOBJ2.readObject().asInstanceOf[SubTask] val ints: List[Int] = task.computer() println(s"计算[8888]后的结果是: ${ints}") TaskOBJ2.close() client.close() server.close() } }
package com.zxy.Socket import java.io.{InputStream, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor2 { def main(args: Array[String]): Unit = { //启动服务器,接受数据 val server: ServerSocket = new ServerSocket(9999) println("服务器启动,等待数据") //等待客户端连接接收数据 val client: Socket = server.accept() val in: InputStream = client.getInputStream val TaskOBJ1: ObjectInputStream = new ObjectInputStream(in) val task: SubTask = TaskOBJ1.readObject().asInstanceOf[SubTask] val ints: List[Int] = task.computer() println(s"计算[9999]后的结果是: ${ints}") TaskOBJ1.close() client.close() server.close() } }
package com.zxy.Socket import java.io.{ObjectOutputStream, OutputStream} import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client1: Socket = new Socket("localhost",8888) val client2: Socket = new Socket("localhost",9999) val task: Task = new Task() //server1发送数据 val out1: OutputStream = client1.getOutputStream val TaskOBJ1: ObjectOutputStream = new ObjectOutputStream(out1) val subTask1 = new SubTask() subTask1.logic = task.logic subTask1.datas = task.datas.take(2) TaskOBJ1.writeObject(subTask1) TaskOBJ1.flush() TaskOBJ1.close() client1.close() //server2发送数据 val out2: OutputStream = client2.getOutputStream val TaskOBJ2: ObjectOutputStream = new ObjectOutputStream(out2) val subTask2 = new SubTask() subTask2.logic = task.logic subTask2.datas = task.datas.takeRight(2) TaskOBJ2.writeObject(subTask2) TaskOBJ2.flush() TaskOBJ2.close() client2.close() println("数据发送完毕") } }
package com.zxy.Socket
class Task extends Serializable {
val datas = List(1,2,3,4)
val logic:Int => Int = _ * 2
}
package com.zxy.Socket
class SubTask extends Serializable {
//初始值
var datas:List[Int] = _
var logic:Int => Int = _
//计算
def computer()={
datas.map(logic)
}
}
先启动Executor1,Executor2;
再启动Driver
Executor1:
服务器启动,等待数据
计算[8888]后的结果是: List(2, 4)
Executor2:
服务器启动,等待数据
计算[9999]后的结果是: List(6, 8)
Driver:
数据发送完毕
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:
RDD: 弹性分布式数据集
累加器:分布式共享只写变量
广播变量:分布式共享只读变量
package Rdd.Rebuilder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * file:Spark_RDD_Memary * author:zxy * date:2021-06-07 * desc:从内存创建RDD */ object Spark_RDD_Memary { def main(args: Array[String]): Unit = { //TODO 准备环境 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc: SparkContext = new SparkContext(conf) //TODO 创建RDD //从内存中创建RDD,将内存中集合的数据作为处理的数据源 val seq = Seq[Int](1,2,3,4) //parallelize:并行 //val rdd: RDD[Int] = sc.parallelize(seq) //makeRDD实现功能和parallelize相同,其底层实现时调用了rdd对象的parallelize方法 val rdd: RDD[Int] = sc.makeRDD(seq) rdd.collect().foreach(println) //TODO 关闭环境 sc.stop() } }
package Rdd.Rebuilder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * file:Spark_RDD_File * author:zxy * date:2021-06-07 * desc:从外部文件创建RDD */ object Spark_RDD_File { def main(args: Array[String]): Unit = { //TODO 准备环境 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc: SparkContext = new SparkContext(conf) //TODO 创建RDD //从文件中创建RDD,将文件中集合的数据作为处理的数据源 //1.path路径默认为根路径为基准,可以写绝对路径也可以写相对路径 //2.path可以是文件,也可以是一个路径 //3.path可以使用通配符 //4.path可以是分布式存储系统的路径:HDFS val rdd: RDD[String] = sc.textFile("spark-core/dates/1.txt") rdd.collect().foreach(println) //TODO 关闭环境 sc.stop() } }
package Rdd.Rebuilder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * file:Spark_RDD_Files * author:zxy * date:2021-06-07 * desc:从外部文件创建RDD */ object Spark_RDD_Files { def main(args: Array[String]): Unit = { //TODO 准备环境 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc: SparkContext = new SparkContext(conf) //TODO 创建RDD //从文件中创建RDD,将文件中的数据作为处理的数据源 //textFile:以行为单位来读取数据,读取的数据都是字符串 //wholeTextFiles:以文件为单位读取数据 val rdd: RDD[(String, String)] = sc.wholeTextFiles("spark-core/dates") rdd.collect().foreach(println) //TODO 关闭环境 sc.stop() } }
package Rdd.Rebuilder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * file:Spark_RDD_File_Par * author:zxy * date:2021-06-07 * desc:从外部文件创建RDD */ object Spark_RDD_File_Par { def main(args: Array[String]): Unit = { //TODO 准备环境 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //2.设置使用核数为5核,即产生5个分区 conf.set("spark.default.parallelism","5") val sc: SparkContext = new SparkContext(conf) //TODO 创建RDD //RDD的并行度 & 分区 //makeRDD方法可以传递第二个参数,这个参数表示分区的数量 //第二个参数可以不传递,那么makeRDD方法会使用默认值:defaultParallelism(默认并行度) //scheduler.conf.getInt("spark.default.parallelism",totalCores) //spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism //如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数 //1.设置两个分区 // val rdd = sc.makeRDD( // List(1,2,3,4),2 // ) //2.选择默认分区 val rdd = sc.makeRDD( List(1,2,3,4) ) //将处理的数据保存成分区文件 rdd.saveAsTextFile("spark-core/dates/output") //TODO 关闭环境 sc.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。