赞
踩
Shuffle是Spark计算引擎的关键所在,是必须经历的一个阶段,在前面的文章中,我们剖析了Shuffle的原理以及Map阶段结果的输出与Reduce阶段结果如何读取。该篇文章是对前面两篇文章
【Spark源码解读之Shuffle原理剖析与源码分析】
【Spark存储机制源码剖析】
细节的深入探究。
了解Shuffle原理的读者都知道,整个Shuffle过程被划分为map和reduce阶段,在Spark Shuffle的过程中,会在map阶段将任务封装为ShuffleMapTask计算结果并且最终写入bucket中,由reduce阶段的ResultTask读取中间计算结果,在早期的版本中,shuffle过程有以下几个问题:
因此,在后期的版本中,对以上问题进行了优化,那么如何优化呢?以及其原理是怎样的呢?这就是该篇文章的目的。
在阅读后续的内容之前,这里先提出几个问题,我们可以带着问题去思考以及阅读源码,理解更加深刻:
map端如何优化大量的中间结果文件导致频繁的磁盘IO?
map端什么时候决定将数据spill到磁盘?
map端是否需要聚合数据,如何聚合数据?
map任务如何输出?
reduce端如何读取map阶段计算的中间结果?
…
对于第一个问题,在前面 的文章中介绍过,spark使用了consolidation
机制,将map任务的多个partition输出的bucket合并为一个,这样就解决了bucket数量很多,导致数据刷新到磁盘的时候产生大量的磁盘IO。
除此之外,spark在后期的版本中还做了很多优化,这也是本篇文章重点介绍知识点:
AppendOnlyMap
缓存并且对中间结果进行聚合计算,减少中间结果占用的内存大小。SizeTrackingAppendOnlyMap
与SizeTrackingPairBuffer
进行缓存,当大小myMemoryThreshold
的大小的时候,会将数据写入磁盘,防止内存溢出。我们了解到map阶段如何开始计算的入口为ShuffleMapTask.runTask
(不了解可以参考之前的文章),然后创建了ShuffleWriter
,并且调用了其write
方法,它是一个接口,其实现类主要有SortShuffleWriter
以及HashShuffleWriter
。因此进入到SortShuffleWriter.writer()
方法中,源码如下:
/** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") sorter = new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) sorter.insertAll(records) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. sorter = new ExternalSorter[K, V, V]( None, Some(dep.partitioner), None, dep.serializer) sorter.insertAll(records) } // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) }
从上述源码中可以看到,其调用了ExternalSorter.insertAll()
方法,进入到方法中:
def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined //对map端的结果写入AppendOnlyMap中并且进行聚合排序 if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true) } } else if (bypassMergeSort) { //如果bypassMergeSort为true,那么不缓存,直接spill到磁盘文件 // SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies if (records.hasNext) { spillToPartitionFiles(records.map { kv => ((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C]) }) } } else { //如果不是以上两种情况,将map计算结果写入SizePairBuffer中,并且执行合并和排序 // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } } }
在这里,我们可以看到map端计算结果的输出有三种输出方式:
AppendOnlyMap
中,并且进行聚合排序等操作(这种方式spark作业中必须定义了聚合函数以及排序函数)。bypassMergeSort
为true的时候map端结果不缓存,也不进行聚合和排序,直接spill到磁盘。shouldCombine
为false,则将结果输出到buffer缓存中。在这里需要提到bypassMergeThreshold
,该参数定义了map端各个partition的数据传递到reduce端进行合并(merge)的阀值,当大小小于该参数的时候,就会直接写入存储文件中,到reduce端统一聚合。
bypassMergeSort
参数标记是否需要传递到reduce端再做合并排序操作,由上面可以知道,当partition大小小于bypassMergeThreshold
时,该参数就会变为true,即到reduce端再做合并操作。
在深入这三种处理方式之前,有必要了解一下SizeTrackingAppendOnlyMap
以及SizeTrackingPairBuffer
。
在spark源码的org.apache.spark.util.collection
路径下,放置着spark内部封装的一系列集合类,便于spark内部使用:
SizeTrackingAppendOnlyMap
它的父类为AppendOnlyMap
,类似于HashMap的数据结构,它也定义了一系列的内部变量,比如负载因子、初始容量等等。
在其内部也定义了如何扩容,哈希等方法,可以类比HashMap,有兴趣的读者可以查看其源码。
在这里我们简单看一下其扩容算法:
/** Increase table size by 1, rehashing if necessary */ private def incrementSize() { curSize += 1 if (curSize > growThreshold) { growTable() } } /** Double the table's size and re-hash everything */ protected def growTable() { val newCapacity = capacity * 2 if (newCapacity >= (1 << 30)) { // We can't make the table this big because we want an array of 2x // that size for our data, but array sizes are at most Int.MaxValue throw new Exception("Can't make capacity bigger than 2^29 elements") } val newData = new Array[AnyRef](2 * newCapacity) val newMask = newCapacity - 1 // Insert all our old values into the new array. Note that because our old keys are // unique, there's no need to check for equality here when we insert. var oldPos = 0 while (oldPos < capacity) { if (!data(2 * oldPos).eq(null)) { val key = data(2 * oldPos) val value = data(2 * oldPos + 1) var newPos = rehash(key.hashCode) & newMask var i = 1 var keepGoing = true while (keepGoing) { val curKey = newData(2 * newPos) if (curKey.eq(null)) { newData(2 * newPos) = key newData(2 * newPos + 1) = value keepGoing = false } else { val delta = i newPos = (newPos + delta) & newMask i += 1 } } } oldPos += 1 } data = newData capacity = newCapacity mask = newMask growThreshold = (LOAD_FACTOR * newCapacity).toInt }
可以看到curSize > growThreshold
时,将调用growTable
方法将容量扩大一倍,然后将旧数组中的数据拷贝到新数组中。那么问题来了,对于spark这类内存计算框架,在大数据场景下,当数据量很大的时候,是否会无限制的扩容呢?这样做不会撑爆内存?答案是否定的。
实际上,spark使用了采样计算的方式,会预测估算未来AppendOnlyMap
的大小,那么如何采样计算呢?SizeTrackingAppendOnlyMap
实现了特质SizeTracker
,在这个类中实现了抽样算法,源码如下:
/** * Callback to be invoked after every update. */ protected def afterUpdate(): Unit = { numUpdates += 1 //如果达到了nextSampleNum采样间隔 if (nextSampleNum == numUpdates) { takeSample() } } /** * Take a new sample of the current collection's size. */ private def takeSample(): Unit = { samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates)) // Only use the last two samples to extrapolate //如果当前采样数量大于2时,则将sample执行一次出队操作,保证样本总数等于2 if (samples.size > 2) { samples.dequeue() } //计算bytesPerUpdate,计算公式如下: // (本次采集大小-上次采样大小)/(本次采集编号-上次采样编号) val bytesDelta = samples.toList.reverse match { case latest :: previous :: tail => (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates) // If fewer than 2 samples, assume no change case _ => 0 } bytesPerUpdate = math.max(0, bytesDelta) //计算下次采样间隔 nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong }
SizeTrackingPairBuffer
实际上是一个初始容量为64的Buffer,它也定义了一系列内部变量以及操作方法,有兴趣的读者可深入了解,这里不做过多介绍:
map端计算结果缓存聚合
这种情况下,spark作业中必须定义聚合器函数,这样就可以在map端对计算结果进行聚合和排序操作,减少了网络间大量的数据传输以及内存空间的占用。对于中间输出数据不是一次性读取,而是逐条放入AppendOnlyMap
的缓存进行溢出判断,当超出myMemoryThreshold
的大小时,将数据写入磁盘,防止内存溢出。
map端简单缓存,排序分组,在reduce端合并组合
这种情况是在spark作业中没有定义聚合器函数,这种方式会使用指定的排序函数对数据按照partition或者key进行排序,最后按照partition顺序合并写入同一文件,它会将多个bucket合并到一个文件,这样减少map输出的文件数量,节省了磁盘IO,提升了性能,对SizeTrackingPairBuffer
的缓存进行溢出判断,当超出myMemoryThreshold
大小时,将数据写入磁盘,防止内存溢出。
map端溢出分区文件,在reduce端合并组合
如果bypassMergeSort
标记为true,那么就会将结果传递到reduce端再做合并与排序,这种情况不使用缓存,而是将数据按照partition写入不同的文件,最后按照partition顺序合并写入同一个文件。这种同样会将多个bucket合并到同一个文件,通过减少map输出的文件数量,节省了磁盘IO,最终提升了性能。
在了解了map阶段处理过程后,我们看看reduce端是如何处理的,实际上,通过阅读之前的文章,我们就可以了解到在reduce端会使用BlockStoreShuffleFetcher.fetch()
方法去Driver的MapOutputTracker
中的获取MapStatus
的信息,然后去相应的BlockManager
中获取相应的中间结果,最终进行计算。
那么在reduce端spark又做了哪些优化呢?
实际上,在reduce端,将中间保存在ShuffleBlockFetcherIterator
中,该类中,定义了一系列成员变量,我们需要理解它们的含义,这里简单罗列一下,具体源码读者可以详细深入阅读:
在reduce端,为了优化程序,充分利用集群的资源,reduce端每一批请求的字节总数不能超过maxBytesInFlight
,而且每个请求的字节数不能超过maxBytesInFlight
的五分之一,这样做提高了请求的并发度,允许5个请求分别从5个节点获取数据,最大限度利用了资源。可以通过spark.reducer.maxMbInFlight
参数来控制该大小。
以上就是今天文章介绍的内容,通过探究shuffle的计算细节,我们了解学到了以下知识点:
map端处理计算结果的几种方式。
map端进行数据的聚合,降低了网络IO,提升了系统性能。
map端以及reduce通过逐条读取数据,避免了大量数据撑爆内存。
发送请求时分批发送,限制分批发送的大小,并行发送请求以及将多个请求数据下的请求合并等优化点。
谢谢阅读,如有问题欢迎留言讨论!!!
欢迎加入大数据学习交流群:731423890
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。