赞
踩
- private def writeBlocks(value: T): Int = {
- // Store a copy of the broadcast variable in the driver so that tasks run on the driver
- // do not create a duplicate copy of the broadcast variable's value.
- SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
- tellMaster = false)
- val blocks =
- TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
- blocks.zipWithIndex.foreach { case (block, i) =>
- SparkEnv.get.blockManager.putBytes(
- BroadcastBlockId(id, "piece" + i),
- block,
- StorageLevel.MEMORY_AND_DISK_SER,
- tellMaster = true)
- }
- blocks.length
- }
- // Mapping from block manager id to the block manager's information.
- private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
-
- ......
-
- // Mapping from block id to the set of block managers that have the block.
- private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- private def updateBlockInfo(
- blockManagerId: BlockManagerId,
- blockId: BlockId,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long,
- tachyonSize: Long): Boolean = {
-
- if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.isDriver && !isLocal) {
- // We intentionally do not register the master (except in local mode),
- // so we should not indicate failure.
- return true
- } else {
- return false
- }
- }
-
- if (blockId == null) {
- blockManagerInfo(blockManagerId).updateLastSeenMs()
- return true
- }
-
- blockManagerInfo(blockManagerId).updateBlockInfo(
- blockId, storageLevel, memSize, diskSize, tachyonSize)
-
- var locations: mutable.HashSet[BlockManagerId] = null
- if (blockLocations.containsKey(blockId)) {
- locations = blockLocations.get(blockId)
- } else {
- locations = new mutable.HashSet[BlockManagerId]
- blockLocations.put(blockId, locations)
- }
-
- if (storageLevel.isValid) {
- locations.add(blockManagerId)
- } else {
- locations.remove(blockManagerId)
- }
-
- // Remove the block from master tracking if it has been removed on all slaves.
- if (locations.size == 0) {
- blockLocations.remove(blockId)
- }
- true
- }
- override protected def getValue() = {
- _value
- }
@transient private lazy val _value: T = readBroadcastBlock()
- TorrentBroadcast.synchronized {
- setConf(SparkEnv.get.conf)
- SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
- case Some(x) =>
- x.asInstanceOf[T]
-
- case None =>
- logInfo("Started reading broadcast variable " + id)
- val startTimeMs = System.currentTimeMillis()
- val blocks = readBlocks()
- logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
-
- val obj = TorrentBroadcast.unBlockifyObject[T](
- blocks, SparkEnv.get.serializer, compressionCodec)
- // Store the merged copy in BlockManager so other tasks on this executor don't
- // need to re-fetch it.
- SparkEnv.get.blockManager.putSingle(
- broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- obj
- }
- }
- // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
- // to the driver, so other executors can pull these chunks from this executor as well.
- val blocks = new Array[ByteBuffer](numBlocks)
- val bm = SparkEnv.get.blockManager
-
- for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
- val pieceId = BroadcastBlockId(id, "piece" + pid)
- logDebug(s"Reading piece $pieceId of $broadcastId")
- // First try getLocalBytes because there is a chance that previous attempts to fetch the
- // broadcast blocks have already fetched some of the blocks. In that case, some blocks
- // would be available locally (on this executor).
- def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
- def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
- // If we found the block from remote executors/driver's BlockManager, put the block
- // in this executor's BlockManager.
- SparkEnv.get.blockManager.putBytes(
- pieceId,
- block,
- StorageLevel.MEMORY_AND_DISK_SER,
- tellMaster = true)
- block
- }
- val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
- throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
- blocks(pid) = block
- }
- blocks
- private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
- require(blockId != null, "BlockId is null")
- val locations = Random.shuffle(master.getLocations(blockId))
- for (loc <- locations) {
- logDebug(s"Getting remote block $blockId from $loc")
- val data = blockTransferService.fetchBlockSync(
- loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
-
- if (data != null) {
- if (asBlockResult) {
- return Some(new BlockResult(
- dataDeserialize(blockId, data),
- DataReadMethod.Network,
- data.limit()))
- } else {
- return Some(data)
- }
- }
- logDebug(s"The value of block $blockId is null")
- }
- logDebug(s"Block $blockId not found")
- None
- }
- final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
- if (storageLevel != StorageLevel.NONE) {
- SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
- } else {
- computeOrReadCheckpoint(split, context)
- }
- }
- def getOrCompute[T](
- rdd: RDD[T],
- partition: Partition,
- context: TaskContext,
- storageLevel: StorageLevel): Iterator[T] = {
- val key = RDDBlockId(rdd.id, partition.index)
- logDebug(s"Looking for partition $key")
- blockManager.get(key) match {
- case Some(blockResult) =>
- // Partition is already materialized, so just return its values
- ......
- val iter = blockResult.data.asInstanceOf[Iterator[T]]
- new InterruptibleIterator[T](context, iter) {
- override def next(): T = {
- existingMetrics.incRecordsRead(1)
- delegate.next()
- }
- }
- case None =>
- ......
- // Otherwise, we have to load the partition ourselves
- try {
- logInfo(s"Partition $key not found, computing it")
- val computedValues = rdd.computeOrReadCheckpoint(partition, context)
- // If the task is running locally, do not persist the result
- if (context.isRunningLocally) {
- return computedValues
- }
- // Otherwise, cache the values and keep track of any updates in block statuses
- val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
- val metrics = context.taskMetrics
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
- new InterruptibleIterator(context, cachedValues)
- } finally {
- ......
- }
- }
- }
- private def putInBlockManager[T](
- key: BlockId,
- values: Iterator[T],
- level: StorageLevel,
- updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
- effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
- val putLevel = effectiveStorageLevel.getOrElse(level)
- if (!putLevel.useMemory) {
- /*
- * This RDD is not to be cached in memory, so we can just pass the computed values as an
- * iterator directly to the BlockManager rather than first fully unrolling it in memory.
- */
- updatedBlocks ++=
- blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
- blockManager.get(key) match {
- case Some(v) => v.data.asInstanceOf[Iterator[T]]
- case None =>
- logInfo(s"Failure to store $key")
- throw new BlockException(key, s"Block manager failed to return cached value for $key!")
- }
- } else {
- /*
- * This RDD is to be cached in memory. In this case we cannot pass the computed values
- * to the BlockManager as an iterator and expect to read it back later. This is because
- * we may end up dropping a partition from memory store before getting it back.
- *
- * In addition, we must be careful to not unroll the entire partition in memory at once.
- * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
- * single partition. Instead, we unroll the values cautiously, potentially aborting and
- * dropping the partition to disk if applicable.
- */
- blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
- case Left(arr) =>
- // We have successfully unrolled the entire partition, so cache it in memory
- updatedBlocks ++=
- blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
- arr.iterator.asInstanceOf[Iterator[T]]
- case Right(it) =>
- // There is not enough space to cache this partition in memory
- val returnValues = it.asInstanceOf[Iterator[T]]
- if (putLevel.useDisk) {
- logWarning(s"Persisting partition $key to disk instead.")
- val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
- useOffHeap = false, deserialized = false, putLevel.replication)
- putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
- } else {
- returnValues
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。