This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0ac516b [SPARK-25035][CORE] Avoiding memory mapping at disk-stored
blocks replication
0ac516b is described below
commit 0ac516bebd4cf647fe073af45298186103134f33
Author: “attilapiros” <[email protected]>
AuthorDate: Mon Feb 25 11:42:55 2019 -0800
[SPARK-25035][CORE] Avoiding memory mapping at disk-stored blocks
replication
Before this PR the method `BlockManager#putBlockDataAsStream()` (which is
used during block replication where the block data is received as a stream) was
reading the whole block content into the memory even at DISK_ONLY storage level.
With this change the received block data (which was temporary stored in a
file) is just simply moved into the right location backing the target block.
This way a possible OOM error is avoided.
In this implementation to save code duplications the method `doPutBytes` is
refactored into a template method called `BlockStoreUpdater` which has a
separate implementation to handle byte buffer based and temporary file based
block store updates.
With existing unit tests of `DistributedSuite` (the ones dealing with
replications):
- caching on disk, replicated (encryption = off) (with replication as
stream)
- caching on disk, replicated (encryption = on) (with replication as stream)
- caching in memory, serialized, replicated (encryption = on) (with
replication as stream)
- caching in memory, serialized, replicated (encryption = off) (with
replication as stream)
- etc.
And with new unit tests testing `putBlockDataAsStream` method directly:
- test putBlockDataAsStream with caching (encryption = off)
- test putBlockDataAsStream with caching (encryption = on)
- test putBlockDataAsStream with caching on disk (encryption = off)
- test putBlockDataAsStream with caching on disk (encryption = on)
Closes #23688 from attilapiros/SPARK-25035.
Authored-by: “attilapiros” <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
---
.../org/apache/spark/storage/BlockManager.scala | 315 +++++++++++++--------
.../scala/org/apache/spark/storage/DiskStore.scala | 30 +-
.../apache/spark/storage/BlockManagerSuite.scala | 60 +++-
3 files changed, 263 insertions(+), 142 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 79e9ee7..09928e4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -33,6 +33,7 @@ import scala.util.Random
import scala.util.control.NonFatal
import com.codahale.metrics.{MetricRegistry, MetricSet}
+import org.apache.commons.io.IOUtils
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
@@ -222,6 +223,187 @@ private[spark] class BlockManager(
private val maxRemoteBlockToMem =
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
/**
+ * Abstraction for storing blocks from bytes, whether they start in memory
or on disk.
+ *
+ * @param blockSize the decrypted size of the block
+ */
+ private abstract class BlockStoreUpdater[T](
+ blockSize: Long,
+ blockId: BlockId,
+ level: StorageLevel,
+ classTag: ClassTag[T],
+ tellMaster: Boolean,
+ keepReadLock: Boolean) {
+
+ /**
+ * Reads the block content into the memory. If the update of the block
store is based on a
+ * temporary file this could lead to loading the whole file into a
ChunkedByteBuffer.
+ */
+ protected def readToByteBuffer(): ChunkedByteBuffer
+
+ protected def blockData(): BlockData
+
+ protected def saveToDiskStore(): Unit
+
+ private def saveDeserializedValuesToMemoryStore(inputStream: InputStream):
Boolean = {
+ try {
+ val values = serializerManager.dataDeserializeStream(blockId,
inputStream)(classTag)
+ memoryStore.putIteratorAsValues(blockId, values, classTag) match {
+ case Right(_) => true
+ case Left(iter) =>
+ // If putting deserialized values in memory failed, we will put
the bytes directly
+ // to disk, so we don't need this iterator and can close it to
free resources
+ // earlier.
+ iter.close()
+ false
+ }
+ } finally {
+ IOUtils.closeQuietly(inputStream)
+ }
+ }
+
+ private def saveSerializedValuesToMemoryStore(bytes: ChunkedByteBuffer):
Boolean = {
+ val memoryMode = level.memoryMode
+ memoryStore.putBytes(blockId, blockSize, memoryMode, () => {
+ if (memoryMode == MemoryMode.OFF_HEAP &&
bytes.chunks.exists(!_.isDirect)) {
+ bytes.copy(Platform.allocateDirectBuffer)
+ } else {
+ bytes
+ }
+ })
+ }
+
+ /**
+ * Put the given data according to the given level in one of the block
stores, replicating
+ * the values if necessary.
+ *
+ * If the block already exists, this method will not overwrite it.
+ *
+ * If keepReadLock is true, this method will hold the read lock when it
returns (even if the
+ * block already exists). If false, this method will hold no locks when it
returns.
+ *
+ * @return true if the block was already present or if the put succeeded,
false otherwise.
+ */
+ def save(): Boolean = {
+ doPut(blockId, level, classTag, tellMaster, keepReadLock) { info =>
+ val startTimeNs = System.nanoTime()
+
+ // Since we're storing bytes, initiate the replication before storing
them locally.
+ // This is faster as data is already serialized and ready to send.
+ val replicationFuture = if (level.replication > 1) {
+ Future {
+ // This is a blocking action and should run in
futureExecutionContext which is a cached
+ // thread pool.
+ replicate(blockId, blockData(), level, classTag)
+ }(futureExecutionContext)
+ } else {
+ null
+ }
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ val putSucceeded = if (level.deserialized) {
+ saveDeserializedValuesToMemoryStore(blockData().toInputStream())
+ } else {
+ saveSerializedValuesToMemoryStore(readToByteBuffer())
+ }
+ if (!putSucceeded && level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ saveToDiskStore()
+ }
+ } else if (level.useDisk) {
+ saveToDiskStore()
+ }
+ val putBlockStatus = getCurrentBlockStatus(blockId, info)
+ val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+ if (blockWasSuccessfullyStored) {
+ // Now that the block is in either the memory or disk store,
+ // tell the master about it.
+ info.size = blockSize
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
+ }
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
+ }
+ logDebug(s"Put block ${blockId} locally took
${Utils.getUsedTimeNs(startTimeNs)}")
+ if (level.replication > 1) {
+ // Wait for asynchronous replication to finish
+ try {
+ ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
+ } catch {
+ case NonFatal(t) =>
+ throw new Exception("Error occurred while waiting for
replication to finish", t)
+ }
+ }
+ if (blockWasSuccessfullyStored) {
+ None
+ } else {
+ Some(blockSize)
+ }
+ }.isEmpty
+ }
+ }
+
+ /**
+ * Helper for storing a block from bytes already in memory.
+ * '''Important!''' Callers must not mutate or release the data buffer
underlying `bytes`. Doing
+ * so may corrupt or change the data stored by the `BlockManager`.
+ */
+ private case class ByteBufferBlockStoreUpdater[T](
+ blockId: BlockId,
+ level: StorageLevel,
+ classTag: ClassTag[T],
+ bytes: ChunkedByteBuffer,
+ tellMaster: Boolean = true,
+ keepReadLock: Boolean = false)
+ extends BlockStoreUpdater[T](bytes.size, blockId, level, classTag,
tellMaster, keepReadLock) {
+
+ override def readToByteBuffer(): ChunkedByteBuffer = bytes
+
+ /**
+ * The ByteBufferBlockData wrapper is not disposed of to avoid releasing
buffers that are
+ * owned by the caller.
+ */
+ override def blockData(): BlockData = new ByteBufferBlockData(bytes, false)
+
+ override def saveToDiskStore(): Unit = diskStore.putBytes(blockId, bytes)
+
+ }
+
+ /**
+ * Helper for storing a block based from bytes already in a local temp file.
+ */
+ private case class TempFileBasedBlockStoreUpdater[T](
+ blockId: BlockId,
+ level: StorageLevel,
+ classTag: ClassTag[T],
+ tmpFile: File,
+ blockSize: Long,
+ tellMaster: Boolean = true,
+ keepReadLock: Boolean = false)
+ extends BlockStoreUpdater[T](blockSize, blockId, level, classTag,
tellMaster, keepReadLock) {
+
+ override def readToByteBuffer(): ChunkedByteBuffer = {
+ val allocator = level.memoryMode match {
+ case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+ case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+ }
+ blockData().toChunkedByteBuffer(allocator)
+ }
+
+ override def blockData(): BlockData = diskStore.getBytes(tmpFile,
blockSize)
+
+ override def saveToDiskStore(): Unit = diskStore.moveFileToBlock(tmpFile,
blockSize, blockId)
+
+ override def save(): Boolean = {
+ val res = super.save()
+ tmpFile.delete()
+ res
+ }
+
+ }
+
+ /**
* Initializes the BlockManager with the given appId. This is not performed
in the constructor as
* the appId may not be known at BlockManager instantiation time (in
particular for the driver,
* where it is only learned after registration with the TaskScheduler).
@@ -412,10 +594,7 @@ private[spark] class BlockManager(
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_]): StreamCallbackWithID = {
- // TODO if we're going to only put the data in the disk store, we should
just write it directly
- // to the final location, but that would require a deeper refactor of this
code. So instead
- // we just write to a temp file, and call putBytes on the data in that
file.
- val tmpFile = diskBlockManager.createTempLocalBlock()._2
+ val (_, tmpFile) = diskBlockManager.createTempLocalBlock()
val channel = new CountingWritableChannel(
Channels.newChannel(serializerManager.wrapForEncryption(new
FileOutputStream(tmpFile))))
logTrace(s"Streaming block $blockId to tmp file $tmpFile")
@@ -431,28 +610,11 @@ private[spark] class BlockManager(
override def onComplete(streamId: String): Unit = {
logTrace(s"Done receiving block $blockId, now putting into local
blockManager")
- // Read the contents of the downloaded file as a buffer to put into
the blockManager.
// Note this is all happening inside the netty thread as soon as it
reads the end of the
// stream.
channel.close()
- // TODO SPARK-25035 Even if we're only going to write the data to disk
after this, we end up
- // using a lot of memory here. We'll read the whole file into a regular
- // byte buffer and OOM. We could at least read the tmp file as a
stream.
- val buffer = securityManager.getIOEncryptionKey() match {
- case Some(key) =>
- // we need to pass in the size of the unencrypted block
- val blockSize = channel.getCount
- val allocator = level.memoryMode match {
- case MemoryMode.ON_HEAP => ByteBuffer.allocate _
- case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
- }
- new EncryptedBlockData(tmpFile, blockSize, conf,
key).toChunkedByteBuffer(allocator)
-
- case None =>
- ChunkedByteBuffer.fromFile(tmpFile)
- }
- putBytes(blockId, buffer, level)(classTag)
- tmpFile.delete()
+ val blockSize = channel.getCount
+ TempFileBasedBlockStoreUpdater(blockId, level, classTag, tmpFile,
blockSize).save()
}
override def onFailure(streamId: String, cause: Throwable): Unit = {
@@ -953,111 +1115,14 @@ private[spark] class BlockManager(
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(bytes != null, "Bytes is null")
- doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster)
- }
-
- /**
- * Put the given bytes according to the given level in one of the block
stores, replicating
- * the values if necessary.
- *
- * If the block already exists, this method will not overwrite it.
- *
- * '''Important!''' Callers must not mutate or release the data buffer
underlying `bytes`. Doing
- * so may corrupt or change the data stored by the `BlockManager`.
- *
- * @param keepReadLock if true, this method will hold the read lock when it
returns (even if the
- * block already exists). If false, this method will
hold no locks when it
- * returns.
- * @return true if the block was already present or if the put succeeded,
false otherwise.
- */
- private def doPutBytes[T](
- blockId: BlockId,
- bytes: ChunkedByteBuffer,
- level: StorageLevel,
- classTag: ClassTag[T],
- tellMaster: Boolean = true,
- keepReadLock: Boolean = false): Boolean = {
- doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock =
keepReadLock) { info =>
- val startTimeNs = System.nanoTime()
- // Since we're storing bytes, initiate the replication before storing
them locally.
- // This is faster as data is already serialized and ready to send.
- val replicationFuture = if (level.replication > 1) {
- Future {
- // This is a blocking action and should run in
futureExecutionContext which is a cached
- // thread pool. The ByteBufferBlockData wrapper is not disposed of
to avoid releasing
- // buffers that are owned by the caller.
- replicate(blockId, new ByteBufferBlockData(bytes, false), level,
classTag)
- }(futureExecutionContext)
- } else {
- null
- }
-
- val size = bytes.size
-
- if (level.useMemory) {
- // Put it in memory first, even if it also has useDisk set to true;
- // We will drop it to disk later if the memory store can't hold it.
- val putSucceeded = if (level.deserialized) {
- val values =
- serializerManager.dataDeserializeStream(blockId,
bytes.toInputStream())(classTag)
- memoryStore.putIteratorAsValues(blockId, values, classTag) match {
- case Right(_) => true
- case Left(iter) =>
- // If putting deserialized values in memory failed, we will put
the bytes directly to
- // disk, so we don't need this iterator and can close it to free
resources earlier.
- iter.close()
- false
- }
- } else {
- val memoryMode = level.memoryMode
- memoryStore.putBytes(blockId, size, memoryMode, () => {
- if (memoryMode == MemoryMode.OFF_HEAP &&
- bytes.chunks.exists(buffer => !buffer.isDirect)) {
- bytes.copy(Platform.allocateDirectBuffer)
- } else {
- bytes
- }
- })
- }
- if (!putSucceeded && level.useDisk) {
- logWarning(s"Persisting block $blockId to disk instead.")
- diskStore.putBytes(blockId, bytes)
- }
- } else if (level.useDisk) {
- diskStore.putBytes(blockId, bytes)
- }
-
- val putBlockStatus = getCurrentBlockStatus(blockId, info)
- val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
- if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory or disk store,
- // tell the master about it.
- info.size = size
- if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, putBlockStatus)
- }
- addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
- }
- logDebug(s"Put block ${blockId} locally took
${Utils.getUsedTimeNs(startTimeNs)}")
- if (level.replication > 1) {
- // Wait for asynchronous replication to finish
- try {
- ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
- } catch {
- case NonFatal(t) =>
- throw new Exception("Error occurred while waiting for replication
to finish", t)
- }
- }
- if (blockWasSuccessfullyStored) {
- None
- } else {
- Some(bytes)
- }
- }.isEmpty
+ val blockStoreUpdater =
+ ByteBufferBlockStoreUpdater(blockId, level, implicitly[ClassTag[T]],
bytes, tellMaster)
+ blockStoreUpdater.save()
}
/**
- * Helper method used to abstract common code from [[doPutBytes()]] and
[[doPutIterator()]].
+ * Helper method used to abstract common code from
[[BlockStoreUpdater.save()]]
+ * and [[doPutIterator()]].
*
* @param putBody a function which attempts the actual put() and returns
None on success
* or Some on failure.
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index aefa2ae..fbda491 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.ListBuffer
import com.google.common.io.Closeables
import io.netty.channel.DefaultFileRegion
+import org.apache.commons.io.FileUtils
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.{config, Logging}
@@ -95,18 +96,17 @@ private[spark] class DiskStore(
}
def getBytes(blockId: BlockId): BlockData = {
- val file = diskManager.getFile(blockId.name)
- val blockSize = getSize(blockId)
+ getBytes(diskManager.getFile(blockId.name), getSize(blockId))
+ }
- securityManager.getIOEncryptionKey() match {
- case Some(key) =>
- // Encrypted blocks cannot be memory mapped; return a special object
that does decryption
- // and provides InputStream / FileRegion implementations for reading
the data.
- new EncryptedBlockData(file, blockSize, conf, key)
+ def getBytes(f: File, blockSize: Long): BlockData =
securityManager.getIOEncryptionKey() match {
+ case Some(key) =>
+ // Encrypted blocks cannot be memory mapped; return a special object
that does decryption
+ // and provides InputStream / FileRegion implementations for reading the
data.
+ new EncryptedBlockData(f, blockSize, conf, key)
- case _ =>
- new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file,
blockSize)
- }
+ case _ =>
+ new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, f, blockSize)
}
def remove(blockId: BlockId): Boolean = {
@@ -123,6 +123,16 @@ private[spark] class DiskStore(
}
}
+ /**
+ * @param blockSize if encryption is configured, the file is assumed to
already be encrypted and
+ * blockSize should be the decrypted size
+ */
+ def moveFileToBlock(sourceFile: File, blockSize: Long, targetBlockId:
BlockId): Unit = {
+ blockSizes.put(targetBlockId, blockSize)
+ val targetFile = diskManager.getFile(targetBlockId.name)
+ FileUtils.moveFile(sourceFile, targetFile)
+ }
+
def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
file.exists()
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 40c6424..ac35ac3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -80,6 +80,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with BeforeAndAfterE
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
+ private def init(sparkConf: SparkConf): Unit = {
+ sparkConf
+ .set("spark.app.id", "test")
+ .set(IS_TESTING, true)
+ .set(MEMORY_FRACTION, 1.0)
+ .set(MEMORY_STORAGE_FRACTION, 0.999)
+ .set("spark.kryoserializer.buffer", "1m")
+ .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
+ }
+
private def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER,
@@ -113,12 +123,7 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
// Set the arch to 64-bit and compressedOops to true to get a
deterministic test-case
System.setProperty("os.arch", "amd64")
conf = new SparkConf(false)
- .set("spark.app.id", "test")
- .set(IS_TESTING, true)
- .set(MEMORY_FRACTION, 1.0)
- .set(MEMORY_STORAGE_FRACTION, 0.999)
- .set("spark.kryoserializer.buffer", "1m")
- .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
+ init(conf)
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set(DRIVER_PORT, rpcEnv.address.port)
@@ -890,7 +895,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with BeforeAndAfterE
val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv,
master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, 0)
- memoryManager.setMemoryStore(store.memoryStore)
store.initialize("app-id")
// The put should fail since a1 is not serializable.
@@ -906,6 +910,48 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
}
}
+ def testPutBlockDataAsStream(blockManager: BlockManager, storageLevel:
StorageLevel): Unit = {
+ val message = "message"
+ val ser = serializer.newInstance().serialize(message).array()
+ val blockId = new RDDBlockId(0, 0)
+ val streamCallbackWithId =
+ blockManager.putBlockDataAsStream(blockId, storageLevel,
ClassTag(message.getClass))
+ streamCallbackWithId.onData("0", ByteBuffer.wrap(ser))
+ streamCallbackWithId.onComplete("0")
+ val blockStatusOption = blockManager.getStatus(blockId)
+ assert(!blockStatusOption.isEmpty)
+ val blockStatus = blockStatusOption.get
+ assert((blockStatus.diskSize > 0) === !storageLevel.useMemory)
+ assert((blockStatus.memSize > 0) === storageLevel.useMemory)
+ assert(blockManager.getBlockData(blockId).nioByteBuffer().array() === ser)
+ }
+
+ Seq(
+ "caching" -> StorageLevel.MEMORY_ONLY,
+ "caching, serialized" -> StorageLevel.MEMORY_ONLY_SER,
+ "caching on disk" -> StorageLevel.DISK_ONLY
+ ).foreach { case (name, storageLevel) =>
+ encryptionTest(s"test putBlockDataAsStream with $name") { conf =>
+ init(conf)
+ val ioEncryptionKey =
+ if (conf.get(IO_ENCRYPTION_ENABLED))
Some(CryptoStreamUtils.createKey(conf)) else None
+ val securityMgr = new SecurityManager(conf, ioEncryptionKey)
+ val serializerManager = new SerializerManager(serializer, conf,
ioEncryptionKey)
+ val transfer =
+ new NettyBlockTransferService(conf, securityMgr, "localhost",
"localhost", 0, 1)
+ val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
+ val blockManager = new BlockManager(SparkContext.DRIVER_IDENTIFIER,
rpcEnv, master,
+ serializerManager, conf, memoryManager, mapOutputTracker,
+ shuffleManager, transfer, securityMgr, 0)
+ try {
+ blockManager.initialize("app-id")
+ testPutBlockDataAsStream(blockManager, storageLevel)
+ } finally {
+ blockManager.stop()
+ }
+ }
+ }
+
test("turn off updated block statuses") {
val conf = new SparkConf()
conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]