当前位置:   article > 正文

Spark学习之6:Broadcast及RDD cache_spark broadcast rdd 数据

spark broadcast rdd 数据
1. Broadcast

1.1. 创建流程



BlockManager的三个put*方法(putIterator、putBytes、putArray)都包括(tellMaster: Boolean = true)参数,默认值为true。该参数是是否通知Master(BlockManagerMasterActor)的开关,当为true时,在将数据写入本地存储系统后,将会把BlockInfo信息发个Master。这样,其他节点就可以通过 BlockManagerMasterActor获取对用BlockId的信息。

1.1.1 TorrentBroadcast.writeBlocks

  1. private def writeBlocks(value: T): Int = {
  2. // Store a copy of the broadcast variable in the driver so that tasks run on the driver
  3. // do not create a duplicate copy of the broadcast variable's value.
  4. SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
  5. tellMaster = false)
  6. val blocks =
  7. TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
  8. blocks.zipWithIndex.foreach { case (block, i) =>
  9. SparkEnv.get.blockManager.putBytes(
  10. BroadcastBlockId(id, "piece" + i),
  11. block,
  12. StorageLevel.MEMORY_AND_DISK_SER,
  13. tellMaster = true)
  14. }
  15. blocks.length
  16. }
(1)将数据作为整体存在本地;
(2)将数据按一定大小进行切片(默认为4096KB=4MB,每个切片对应一个block,并有唯一的BroadcastBlockId),存在本地并向Master报告。
注:
HttpBroadcast只将数据作为整体存储在本地,并通知Master,不会对数据进行切片。TorrentBroadcast同过将数据切片,最后会形成在很多节点上都有备份数据,当有新节点需要数据时,就可以提高数据获取的并行度,从而提高数据远程读取效率。

1.1.2 BlockManagerMasterActor.updateBlockInfo

BlockManagerMasterActor收到UpdateBlockInfo消息后将调用updateBlockInfo方法。
该方法主要更新下面成员:
  1. // Mapping from block manager id to the block manager's information.
  2. private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
  3. ......
  4. // Mapping from block id to the set of block managers that have the block.
  5. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
一个block可能在多个节点存有备份,所以每个BlockId对应一个位置集合,位置用BlockManagerId表示。
方法代码:
  1. private def updateBlockInfo(
  2. blockManagerId: BlockManagerId,
  3. blockId: BlockId,
  4. storageLevel: StorageLevel,
  5. memSize: Long,
  6. diskSize: Long,
  7. tachyonSize: Long): Boolean = {
  8. if (!blockManagerInfo.contains(blockManagerId)) {
  9. if (blockManagerId.isDriver && !isLocal) {
  10. // We intentionally do not register the master (except in local mode),
  11. // so we should not indicate failure.
  12. return true
  13. } else {
  14. return false
  15. }
  16. }
  17. if (blockId == null) {
  18. blockManagerInfo(blockManagerId).updateLastSeenMs()
  19. return true
  20. }
  21. blockManagerInfo(blockManagerId).updateBlockInfo(
  22. blockId, storageLevel, memSize, diskSize, tachyonSize)
  23. var locations: mutable.HashSet[BlockManagerId] = null
  24. if (blockLocations.containsKey(blockId)) {
  25. locations = blockLocations.get(blockId)
  26. } else {
  27. locations = new mutable.HashSet[BlockManagerId]
  28. blockLocations.put(blockId, locations)
  29. }
  30. if (storageLevel.isValid) {
  31. locations.add(blockManagerId)
  32. } else {
  33. locations.remove(blockManagerId)
  34. }
  35. // Remove the block from master tracking if it has been removed on all slaves.
  36. if (locations.size == 0) {
  37. blockLocations.remove(blockId)
  38. }
  39. true
  40. }
(1)更新BlockManagerInfo;
(2)更新block的位置信息(用BlockManagerId表示)。

1.2. 读取流程



先从本地获取,然后考虑远程获取。

1.2.1 TorrentBroadcast.getValue

  1. override protected def getValue() = {
  2. _value
  3. }
_value是一个lazy变量:
  @transient private lazy val _value: T = readBroadcastBlock()

1.2.2 TorrentBroadcast.readBroadcastBlock

  1. TorrentBroadcast.synchronized {
  2. setConf(SparkEnv.get.conf)
  3. SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
  4. case Some(x) =>
  5. x.asInstanceOf[T]
  6. case None =>
  7. logInfo("Started reading broadcast variable " + id)
  8. val startTimeMs = System.currentTimeMillis()
  9. val blocks = readBlocks()
  10. logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
  11. val obj = TorrentBroadcast.unBlockifyObject[T](
  12. blocks, SparkEnv.get.serializer, compressionCodec)
  13. // Store the merged copy in BlockManager so other tasks on this executor don't
  14. // need to re-fetch it.
  15. SparkEnv.get.blockManager.putSingle(
  16. broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
  17. obj
  18. }
  19. }
(1)先从本地读取对应BroadcastBlockId对应的数据;
(2)调用readBlocks读取所有的block;
(3)合并读到的blocks;
(4)调用BlockManager.putSingle将数据存储在本地,其tellMaster值为false,即不通知Master。

1.2.3 TorrentBroadcast.readBlocks

  1. // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
  2. // to the driver, so other executors can pull these chunks from this executor as well.
  3. val blocks = new Array[ByteBuffer](numBlocks)
  4. val bm = SparkEnv.get.blockManager
  5. for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
  6. val pieceId = BroadcastBlockId(id, "piece" + pid)
  7. logDebug(s"Reading piece $pieceId of $broadcastId")
  8. // First try getLocalBytes because there is a chance that previous attempts to fetch the
  9. // broadcast blocks have already fetched some of the blocks. In that case, some blocks
  10. // would be available locally (on this executor).
  11. def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
  12. def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
  13. // If we found the block from remote executors/driver's BlockManager, put the block
  14. // in this executor's BlockManager.
  15. SparkEnv.get.blockManager.putBytes(
  16. pieceId,
  17. block,
  18. StorageLevel.MEMORY_AND_DISK_SER,
  19. tellMaster = true)
  20. block
  21. }
  22. val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
  23. throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
  24. blocks(pid) = block
  25. }
  26. blocks
(1)先从本地获取对应的切片;
(2)如果本地没有,则从Remote获取,并将获取的切片存储在本地,并通知Master。
BlockManager.getRemoteBytes将调用BlockManager.doGetRemote方法。

1.2.4 BlockManager.doGetRemote

  1. private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
  2. require(blockId != null, "BlockId is null")
  3. val locations = Random.shuffle(master.getLocations(blockId))
  4. for (loc <- locations) {
  5. logDebug(s"Getting remote block $blockId from $loc")
  6. val data = blockTransferService.fetchBlockSync(
  7. loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
  8. if (data != null) {
  9. if (asBlockResult) {
  10. return Some(new BlockResult(
  11. dataDeserialize(blockId, data),
  12. DataReadMethod.Network,
  13. data.limit()))
  14. } else {
  15. return Some(data)
  16. }
  17. }
  18. logDebug(s"The value of block $blockId is null")
  19. }
  20. logDebug(s"Block $blockId not found")
  21. None
  22. }
(1)获取BlockId所在的BlockManager,返回值是BlockManagerId序列,并将BlockManagerId进行随机排列;
(2)通过NettyBlockTransferService来从远端读取block。

2. RDD cache

2.1. RDD读写流程



(1)RDD cache读写入口为RDD.iterator函数,在RDD具体计算过程中发起;
(2)CacheManager.getOrCompute方法首先从BlockManager中查找partition对应的Block(一个partition对应一个block),若没找到则调用RDD.computeOrReadCheckpoint方法,计算完成后将结果进行cache;
(3)到具体写Memory、Disk的过程和Broadcast相同;
(4)从BlockManager进行get的流程和Broadcast也相似。

2.2. RDD.iterator

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  2. if (storageLevel != StorageLevel.NONE) {
  3. SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  4. } else {
  5. computeOrReadCheckpoint(split, context)
  6. }
  7. }
根据RDD的storageLevel,判断是否cache。

2.3. CacheManager.getOrCompute

  1. def getOrCompute[T](
  2. rdd: RDD[T],
  3. partition: Partition,
  4. context: TaskContext,
  5. storageLevel: StorageLevel): Iterator[T] = {
  6. val key = RDDBlockId(rdd.id, partition.index)
  7. logDebug(s"Looking for partition $key")
  8. blockManager.get(key) match {
  9. case Some(blockResult) =>
  10. // Partition is already materialized, so just return its values
  11. ......
  12. val iter = blockResult.data.asInstanceOf[Iterator[T]]
  13. new InterruptibleIterator[T](context, iter) {
  14. override def next(): T = {
  15. existingMetrics.incRecordsRead(1)
  16. delegate.next()
  17. }
  18. }
  19. case None =>
  20. ......
  21. // Otherwise, we have to load the partition ourselves
  22. try {
  23. logInfo(s"Partition $key not found, computing it")
  24. val computedValues = rdd.computeOrReadCheckpoint(partition, context)
  25. // If the task is running locally, do not persist the result
  26. if (context.isRunningLocally) {
  27. return computedValues
  28. }
  29. // Otherwise, cache the values and keep track of any updates in block statuses
  30. val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
  31. val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
  32. val metrics = context.taskMetrics
  33. val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
  34. metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
  35. new InterruptibleIterator(context, cachedValues)
  36. } finally {
  37. ......
  38. }
  39. }
  40. }
(1)根据RDD的id和partition的索引值创建RDDBlockId对象;
(2)从BlockManager中查找对应RDDBlockId的block;
(3)如果找到,直接返回结果;
(4)如果没有找到,则调用RDD.computeOrReadCheckpoint计算分区;
(5)调用CacheManager.putInBlockManager将分区结果进行cache。

2.4. CacheManager.putInBlockManager

  1. private def putInBlockManager[T](
  2. key: BlockId,
  3. values: Iterator[T],
  4. level: StorageLevel,
  5. updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
  6. effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
  7. val putLevel = effectiveStorageLevel.getOrElse(level)
  8. if (!putLevel.useMemory) {
  9. /*
  10. * This RDD is not to be cached in memory, so we can just pass the computed values as an
  11. * iterator directly to the BlockManager rather than first fully unrolling it in memory.
  12. */
  13. updatedBlocks ++=
  14. blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
  15. blockManager.get(key) match {
  16. case Some(v) => v.data.asInstanceOf[Iterator[T]]
  17. case None =>
  18. logInfo(s"Failure to store $key")
  19. throw new BlockException(key, s"Block manager failed to return cached value for $key!")
  20. }
  21. } else {
  22. /*
  23. * This RDD is to be cached in memory. In this case we cannot pass the computed values
  24. * to the BlockManager as an iterator and expect to read it back later. This is because
  25. * we may end up dropping a partition from memory store before getting it back.
  26. *
  27. * In addition, we must be careful to not unroll the entire partition in memory at once.
  28. * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
  29. * single partition. Instead, we unroll the values cautiously, potentially aborting and
  30. * dropping the partition to disk if applicable.
  31. */
  32. blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
  33. case Left(arr) =>
  34. // We have successfully unrolled the entire partition, so cache it in memory
  35. updatedBlocks ++=
  36. blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
  37. arr.iterator.asInstanceOf[Iterator[T]]
  38. case Right(it) =>
  39. // There is not enough space to cache this partition in memory
  40. val returnValues = it.asInstanceOf[Iterator[T]]
  41. if (putLevel.useDisk) {
  42. logWarning(s"Persisting partition $key to disk instead.")
  43. val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
  44. useOffHeap = false, deserialized = false, putLevel.replication)
  45. putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
  46. } else {
  47. returnValues
  48. }
  49. }
  50. }
  51. }
(1)如果不存储在memory中,则直接调用BlockManager.putIterator;
(2)如果存储在memory中,这先调用MemoryStore.unrollSafely方法,判断分区是否能够全部缓存在memory中;
(3)如果能,则调用BlockManager.putArray;
(4)如果不能,则判断是否需要存储在Disk中,若条件满足,则修改存储方式,让后直接调用putInBlockManager自身。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/725028
推荐阅读
相关标签
  

闽ICP备14008679号