赞
踩
Spark框架核心是一个计算引擎,采用了标准的master-slave标准。其中Driver表示Master,负责管理整个集群的作业任务调度。Excutor是slave,负责实际执行任务。
Spark应用程序提交到Yarn环境中执行的时候,一般会有两种部署执行的方式:Client和Cluster。两种模式,主要区别在于:Driver程序的运行节点。
概念 | 解释 |
---|---|
client | 申请运行Job任务的设备或程序 |
server | 处理客户端请求,响应结果 |
master | 集群的管理者,负责处理提交到集群上的Job,类比为RM |
worker | 工作者!实际负责接受master分配的任务,运行任务,类比为NM |
driver | 创建SparkContext的程序,称为Driver程序 |
executor | 计算器,计算者。是一个进程,负责Job核心运算逻辑的运行! |
task | 计算任务! task是线程级别!在一个executor中可以同时运行多个task |
并行度 | 取决于executor申请的core数 |
application | 可以提交SparkJob的应用程序,在一个application中,可以调用多次行动算子,每个行动算子都是一个Job! |
Job | 一个Spark的任务,在一个Job中,Spark又会将Job划分为若干阶段(stage),在划分阶段时,会使用DAG调度器,将算子按照特征(是否shuffle)进行划分。 |
stage | 阶段。一个Job的stage的数量= shuffle算子的个数+1。 只要遇到会产生shuffle的算子,就会产生新的阶段! 阶段划分的意义: 同一个阶段的每个分区的数据,可以交给1个Task进行处理! |
object WordCount { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("My app") //手动创建应用的上下文 val sparkContext = new SparkContext(conf) // 进行wc的编程 // 创建RDD RDD[String]: RDD[一行内容] HadoopRDD val rdd: RDD[String] = sparkContext.textFile("input") // RDD[String]: RDD[单词] val rdd1: RDD[String] = rdd.flatMap(line => line.split(" ")) // RDD[(String, Int)]: RDD[(单词,1)] val rdd2: RDD[(String, Int)] = rdd1.map(word => (word, 1)) // reduceByKey: 将相同key对应的values进行reduce运算! val result: RDD[(String, Int)] = rdd2.reduceByKey((value1, value2) => value1 + value2) // 对结果进行输出 // println(result.collect().mkString(",")) // println(result.collect().mkString(",")) result.saveAsTextFile("output") // 运算完成后,需要关闭sparkContext sparkContext.stop() } }
object WordCount2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("My app") //手动创建应用的上下文 val sparkContext = new SparkContext(conf) //进行wc的编程 //创建RDD RDD[String]:RDD[一行内容] val result:RDD[(String,Int)]=sparkContext.textFile("sparkday01/input") .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) //对结果进行打印输出 println(result.collect().mkString(",")) //关闭sparkContext sparkContext.stop() } }
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
def testMakeRDD()={
val list = List(1,2,3,4,5,6)
val rdd1 = sparkContext.makeRDD(list,5)
val rdd2 = sparkContext.parallelize(List(1,2,3,4))
}
从底层代码来讲,makeRDD方法就是parallize()方法
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
本地文件系统,所有Hadoop支持的数据集,比如HDFS和HBase等
val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
//使用textFile方法从指定路径获取
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
def textFile(
path: String,
//读取文件时,传递的参数为最小分区数,但不一定就是这个分区数,取决于hadoop读取文件时的分片规则
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
//比较defaultParralelism的和2的最小值,默认的SparkConf中没有设置spark.default.parallelism,默认defaultParallelism=totalCores(当前本地集群可以用的总核数),目的为了最大限度并行运行,取默认值和2的最小值,defaultMinPartitions和minPartitions不是最终分区数,但是会影响最终分区数!最终分区数,还取决于切片数!
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) try { // 调用输入格式(org.apache.hadoop.mapred.TextInputFormat)进行切片,切片时, minPartitions=2 val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) // 是否过滤空切片后的切片集合 val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits } // 如果切的是1片,且是针对文件的切片,做特殊处理 if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { val fileSplit = inputSplits(0).asInstanceOf[FileSplit] val path = fileSplit.getPath if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) { val codecFactory = new CompressionCodecFactory(jobConf) if (Utils.isFileSplittable(path, codecFactory)) { logWarning(s"Loading one large file ${path.toString} with only one partition, " + s"we can increase partition numbers for improving performance.") } else { logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + s"partition, because the file is compressed by unsplittable compression codec.") } } } // 分区数=过滤后的切片数 val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } catch { case e: InvalidInputException if ignoreMissingFiles => logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" + s" partitions returned from this path.", e) Array.empty[Partition] }
由上述源码可知分区数=过滤后的切片数,因此查看切片方法源码
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // 当前切片的数据的总大小 long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } // 计算 goalsize(期望每片大小),numSplits受并行度影响,如果设置了则按照设置个数来算,没设置就按照并行度和2取最小值def defaultMinPartitions: Int = math.min(defaultParallelism, 2) long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); // 默认为1,调节 org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 改变minSize long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); // 切片以文件为单位切片 for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); //文件非空 if (length != 0) { FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(fs, path)) { // 获取文件的块大小,块大小在上传文件时,指定,如果不指定,默认 128M long blockSize = file.getBlockSize(); // 计算片大小 一般等于 blockSize long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; // 循环切片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } // 剩余部分 <=片大小1.1倍,整体作为1片 if (bytesRemaining != 0) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { //如果文件不可切,整个文件作为1片 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { // 文件长度为0,创建一个空切片 //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits.toArray(new FileSplit[splits.size()]); }
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
// 在处理大数据时,goalsize一般很大,(例如10T/100)一般情况下,blockSize作为片大小
return Math.max(minSize, Math.min(goalSize, blockSize));
//下面是hadoop中FileInputFormat的切片大小公式,很类似
Math.max(minSize,Math.min(maxSize,blockSize))其中minsize默认值为1,maxsize非常大
总结:从分析源码中获取的关键信息为,分区数=切片数,切片大小计算方法为max(minSize, Math.min(goalSize, blockSize));查看源码得知minSize=1,blockSize默认为128M(本地模式32M)
@Test
def TestTextFile(): Unit ={
val result:RDD[(String,Int)]=sparkContext.textFile("input1",6)
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
//对结果进行打印输出
//println(result.collect().mkString(","))
result.saveAsTextFile("output")
}
input1中有aa.txt=351b bb.txt=349b文件goalSize=700/6=116.7b
切片大小为max(1,min(333.3b,32m))=116.7b
切片数为351/116.7=3+余数 349/116.7=2+余数,剩余部分小于1.1倍的116.7,因此放在一个分区内,总共有0-5六个partition
def makeRDD[T: ClassTag](
seq: Seq[T],
//numSlices: 控制集合创建几个分区!
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
//抽象方法
def defaultParallelism(): Int
ctl+H找到实现类
override def defaultParallelism(): Int = backend.defaultParallelism()
点击defaultParallelism()
/**
* A backend interface for scheduling systems that allows plugging in different ones under
* TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
private val appId = "spark-application-" + System.currentTimeMillis
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
def defaultParallelism(): Int
ctl+h找到SchedulerBackend的实现类CoarseGrainedSchedulerBackend
// 默认defaultParallelism=totalCores(当前本地集群可以用的总核数),目的为了最大限度并行运行!
override def defaultParallelism(): Int = {
//点击getInt方法查看
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
def getInt(key: String, defaultValue: Int): Int = catchIllegalValue(key) {
getOption(key).map(_.toInt).getOrElse(defaultValue)
}
默认defaultParallelism=totalCores(当前本地集群可以用的总核数),目的为了最大限度并行运行!standalone / YARN模式, totalCores是Job申请的总的核数!
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { //检查分区数是否合法 if (numSlices < 1) { throw new IllegalArgumentException("Positive number of partitions required") } // Sequences need to be sliced at the same set of index positions for operations // like RDD.zip() to behave as expected /*length:6 numSlices:4 */ def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { /* (0 until numSlices) = [0,4) 0: (0,1) 1: (1,3) 2: (3,4) 3: (4,6) */ (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } ...... 对range类型进行特殊处理 // 非Ranger,按此方法处理 case _ => //Array(1,2,3,4,5,6) val array = seq.toArray // To prevent O(n^2) operations for List etc // 返回 {(0,1),(1,3),(3,4),(4,6)} // 1,(2,3),4,(5,6) positions(array.length, numSlices) .map { case (start, end) => array.slice(start, end).toSeq // 1,(2,3),4,(5,6) }.toSeq }
class RDDTest { //本地集群总核数取决于local[N] local:1核local[2]: 2核 local[*] : 所有核 val sparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("My app")) //由于本机cpu为4核,makeRDD方法没有设置并行度,按照local[*]参数来配置分区,因此该例计算文件个数为4个 @Test def testMakeRDD()={ val list = List(1,2,3,4,5,6) val rdd1 = sparkContext.makeRDD(list) rdd1.saveAsTextFile("output") } } partition-00000 1 partition-00001 2,3 partition-00002 4 partition-00003 5,6
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。