This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 94cccad9e3f2 [SPARK-53755][CORE] Add log support in BlockManager
94cccad9e3f2 is described below
commit 94cccad9e3f226e841329490bdfbf73601880541
Author: Tengfei Huang <[email protected]>
AuthorDate: Mon Oct 20 17:54:19 2025 -0700
[SPARK-53755][CORE] Add log support in BlockManager
### What changes were proposed in this pull request?
Add log support in `BlockManager` to help collecting and analyzing logs for
live debug. A few new components have been introduced to do that.
1. `LogBlockId` to represent log data, which can be used for log data
filter. And `LogBlockType` to identify different kinds of logs.
2. `LogLine` to define the log structure.
3. `LogBlockWriter` to help to write a single log block.
4. `RollingLogWriter` to help to write logs in a rolling manner, split
blocks by size(approximately).
Usage Example:
```
// Define a blockId generator which can help to generate unique block ids.
val logBlockIdGenerator = new LogBlockIdGenerator {
override def logBlockType: LogBlockType = LogBlockType.TEST
override protected def genUniqueBlockId(
lastLogTime: Long, executorId: String): LogBlockId = {
TestLogBlockId(lastLogTime, executorId)
}
}
// Get a log writer and write logs.
val logBlockWriter = store.getRollingLogWriter[LogLine](logBlockIdGenerator)
logBlockWriter.writeLog(LogLine(0L, 1, "Log message 1"))
logBlockWriter.writeLog(LogLine(1L, 2, "Log message 2"))
logBlockWriter.writeLog(LogLine(2L, 3, "Log message 3"))
logBlockWriter.writeLog(LogLine(3L, 4, "Log message 4"))
// Close writer after writing all logs.
logBlockWriter.close()
```
### Why are the changes needed?
Collect and analyze logs with spark jobs to make live debug easier.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT added.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52643 from ivoson/SPARK-53755.
Authored-by: Tengfei Huang <[email protected]>
Signed-off-by: Takuya Ueshin <[email protected]>
---
.../spark/serializer/SerializerManager.scala | 15 +-
.../scala/org/apache/spark/storage/BlockId.scala | 40 +++++
.../org/apache/spark/storage/BlockManager.scala | 32 +++-
.../apache/spark/storage/LogBlockIdGenerator.scala | 49 ++++++
.../org/apache/spark/storage/LogBlockWriter.scala | 182 +++++++++++++++++++++
.../scala/org/apache/spark/storage/LogLine.scala | 48 ++++++
.../apache/spark/storage/RollingLogWriter.scala | 115 +++++++++++++
.../apache/spark/storage/BlockManagerSuite.scala | 79 +++++++++
.../apache/spark/storage/LogBlockWriterSuite.scala | 137 ++++++++++++++++
9 files changed, 691 insertions(+), 6 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 640396a69526..d53a4d549782 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -172,9 +172,8 @@ private[spark] class SerializerManager(
outputStream: OutputStream,
values: Iterator[T]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
- val autoPick = !blockId.isInstanceOf[StreamBlockId]
- val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
- ser.serializeStream(wrapForCompression(blockId,
byteStream)).writeAll(values).close()
+ blockSerializationStream[T](blockId, byteStream)(implicitly[ClassTag[T]])
+ .writeAll(values).close()
}
/** Serializes into a chunked byte buffer. */
@@ -212,4 +211,14 @@ private[spark] class SerializerManager(
.deserializeStream(wrapForCompression(blockId, stream))
.asIterator.asInstanceOf[Iterator[T]]
}
+
+ /** Generate a `SerializationStream` for a block. */
+ private[spark] def blockSerializationStream[T](
+ blockId: BlockId,
+ outputStream: OutputStream)
+ (classTag: ClassTag[T]): SerializationStream = {
+ val autoPick = !blockId.isInstanceOf[StreamBlockId]
+ val ser = getSerializer(classTag, autoPick).newInstance()
+ ser.serializeStream(wrapForCompression(blockId, outputStream))
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 6eb015d56b2c..a15426783ebe 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -23,6 +23,7 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.network.shuffle.RemoteBlockPushResolver
+import org.apache.spark.storage.LogBlockType.LogBlockType
/**
* :: DeveloperApi ::
@@ -175,6 +176,42 @@ case class PythonStreamBlockId(streamId: Int, uniqueId:
Long) extends BlockId {
override def name: String = "python-stream-" + streamId + "-" + uniqueId
}
+object LogBlockType extends Enumeration {
+ type LogBlockType = Value
+ val TEST = Value
+}
+
+/**
+ * Identifies a block of log data.
+ *
+ * @param lastLogTime the timestamp of the last log entry in this block, used
for filtering
+ * and log management.
+ * @param executorId the ID of the executor that produced this log block.
+ */
+abstract sealed class LogBlockId(
+ val lastLogTime: Long,
+ val executorId: String) extends BlockId {
+ def logBlockType: LogBlockType
+}
+
+object LogBlockId {
+ def empty(logBlockType: LogBlockType): LogBlockId = {
+ logBlockType match {
+ case LogBlockType.TEST => TestLogBlockId(0L, "")
+ case _ => throw new SparkException(s"Unsupported log block type:
$logBlockType")
+ }
+ }
+}
+
+// Used for test purpose only.
+case class TestLogBlockId(override val lastLogTime: Long, override val
executorId: String)
+ extends LogBlockId(lastLogTime, executorId) {
+ override def name: String =
+ "test_log_" + lastLogTime + "_" + executorId
+
+ override def logBlockType: LogBlockType = LogBlockType.TEST
+}
+
/** Id associated with temporary local data managed as blocks. Not
serializable. */
private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
override def name: String = "temp_local_" + id
@@ -222,6 +259,7 @@ object BlockId {
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r
+ val TEST_LOG_BLOCK = "test_log_([0-9]+)_(.*)".r
def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) =>
@@ -262,6 +300,8 @@ object BlockId {
TempLocalBlockId(UUID.fromString(uuid))
case TEMP_SHUFFLE(uuid) =>
TempShuffleBlockId(UUID.fromString(uuid))
+ case TEST_LOG_BLOCK(lastLogTime, executorId) =>
+ TestLogBlockId(lastLogTime.toLong, executorId)
case TEST(value) =>
TestBlockId(value)
case _ => throw SparkCoreErrors.unrecognizedBlockIdError(name)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1e1cb8bf9fd5..602e8d068fb6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -58,6 +58,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver,
MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter}
import
org.apache.spark.storage.BlockManagerMessages.{DecommissionBlockManager,
ReplicateBlock}
+import org.apache.spark.storage.LogBlockType.LogBlockType
import org.apache.spark.storage.memory._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
@@ -294,12 +295,17 @@ private[spark] class BlockManager(
decommissioner.isDefined
}
- @inline final private def checkShouldStore(blockId: BlockId) = {
+ @inline final private def checkShouldStore(blockId: BlockId, level:
StorageLevel) = {
// Don't reject broadcast blocks since they may be stored during task exec
and
// don't need to be migrated.
if (isDecommissioning() && !blockId.isBroadcast) {
throw
SparkCoreErrors.cannotSaveBlockOnDecommissionedExecutorError(blockId)
}
+ if (blockId.isInstanceOf[LogBlockId] && level != StorageLevel.DISK_ONLY) {
+ throw SparkException.internalError(
+ s"Cannot store log block $blockId with storage level $level. " +
+ "Log blocks must be stored with DISK_ONLY.")
+ }
}
// This is a lazy val so someone can migrating RDDs even if they don't have
a MigratableResolver
@@ -763,7 +769,7 @@ private[spark] class BlockManager(
level: StorageLevel,
classTag: ClassTag[_]): StreamCallbackWithID = {
- checkShouldStore(blockId)
+ checkShouldStore(blockId, level)
if (blockId.isShuffle) {
logDebug(s"Putting shuffle block ${blockId}")
@@ -1483,6 +1489,26 @@ private[spark] class BlockManager(
syncWrites, writeMetrics, blockId)
}
+ /**
+ * To get a log block writer that can write logs directly to a disk block.
Either `save` or
+ * `close` should be called to finish the writing and release opened
resources.
+ * `save` would write the block to the block manager, while `close` would
just close the writer.
+ */
+ def getLogBlockWriter(
+ logBlockType: LogBlockType): LogBlockWriter = {
+ new LogBlockWriter(this, logBlockType, conf)
+ }
+
+ /**
+ * To get a rolling log writer that can write logs to block manager and
split the logs
+ * to multiple blocks if the log size exceeds the threshold.
+ */
+ def getRollingLogWriter(
+ blockIdGenerator: LogBlockIdGenerator,
+ rollingSize: Long = 33554432L): RollingLogWriter = {
+ new RollingLogWriter(this, blockIdGenerator, rollingSize)
+ }
+
/**
* Put a new block of serialized bytes to the block manager.
*
@@ -1540,7 +1566,7 @@ private[spark] class BlockManager(
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
- checkShouldStore(blockId)
+ checkShouldStore(blockId, level)
val putBlockInfo = {
val newInfo = new BlockInfo(level, classTag, tellMaster)
diff --git
a/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala
b/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala
new file mode 100644
index 000000000000..4a2b90677ba3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.SparkException
+import org.apache.spark.storage.LogBlockType.LogBlockType
+
+/**
+ * LogBlockIdGenerator is responsible for generating unique LogBlockIds for
log blocks.
+ */
+trait LogBlockIdGenerator {
+ // The log block type that this generator supports.
+ def logBlockType: LogBlockType
+
+ // Generates a unique LogBlockId based on the last log time and executor ID.
+ protected def genUniqueBlockId(lastLogTime: Long, executorId: String):
LogBlockId
+
+ /**
+ * Generates a new LogBlockId based on the last log time and executor ID.
Make sure that
+ * the generated LogBlockId has the same log block type as this generator.
+ *
+ * @param lastLogTime The timestamp of the last log entry.
+ * @param executorId The ID of the executor generating the log block.
+ */
+ final def nextBlockId(lastLogTime: Long, executorId: String): LogBlockId = {
+ val blockId = genUniqueBlockId(lastLogTime, executorId)
+ if (blockId.logBlockType != this.logBlockType) {
+ throw SparkException.internalError(
+ "BlockId generated by LogBlockIdGenerator does not match " +
+ s"the expected log block type: ${blockId.logBlockType} !=
${this.logBlockType}")
+ }
+ blockId
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala
b/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala
new file mode 100644
index 000000000000..5645f59d383d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.BufferedOutputStream
+import java.io.File
+import java.io.FileOutputStream
+
+import org.apache.commons.io.output.CountingOutputStream
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.SerializationStream
+import org.apache.spark.storage.LogBlockType.LogBlockType
+import org.apache.spark.util.Utils
+
+/**
+ * A class for writing logs directly to a file on disk and save as a block in
BlockManager if
+ * there are any logs written.
+ * `save` or `close` must be called to ensure resources are released properly.
`save` will add
+ * the log block to BlockManager, while `close` will just release the
resources without saving
+ * the log block.
+ *
+ * Notes:
+ * - This class does not support concurrent writes.
+ * - The writer will be automatically closed when failed to write logs or
failed to save the
+ * log block.
+ * - Write operations after closing will throw exceptions.
+ */
+private[spark] class LogBlockWriter(
+ blockManager: BlockManager,
+ logBlockType: LogBlockType,
+ sparkConf: SparkConf,
+ bufferSize: Int = 32 * 1024) extends Logging {
+
+ private[storage] var tmpFile: File = null
+
+ private var cos: CountingOutputStream = null
+ private var objOut: SerializationStream = null
+ private var hasBeenClosed = false
+ private var recordsWritten = false
+ private var totalBytesWritten = 0
+
+ initialize()
+
+ private def initialize(): Unit = {
+ try {
+ val dir = new File(Utils.getLocalDir(sparkConf))
+ tmpFile = File.createTempFile(s"spark_log_$logBlockType", "", dir)
+ val fos = new FileOutputStream(tmpFile, false)
+ val bos = new BufferedOutputStream(fos, bufferSize)
+ cos = new CountingOutputStream(bos)
+ val emptyBlockId = LogBlockId.empty(logBlockType)
+ objOut = blockManager
+ .serializerManager
+ .blockSerializationStream(emptyBlockId,
cos)(LogLine.getClassTag(logBlockType))
+ } catch {
+ case e: Exception =>
+ logError(log"Failed to initialize LogBlockWriter.", e)
+ close()
+ throw e
+ }
+ }
+
+ def bytesWritten(): Int = {
+ Option(cos)
+ .map(_.getCount)
+ .getOrElse(totalBytesWritten)
+ }
+
+ /**
+ * Write a log entry to the log block. Exception will be thrown if the
writer has been closed
+ * or if there is an error during writing. Caller needs to deal with the
exception. Suggest to
+ * close the writer when exception is thrown as block data could be
corrupted which would lead
+ * to issues when reading the log block later.
+ *
+ * @param logEntry The log entry to write.
+ */
+ def writeLog(logEntry: LogLine): Unit = {
+ if (hasBeenClosed) {
+ throw SparkException.internalError(
+ "Writer already closed. Cannot write more data.",
+ category = "STORAGE"
+ )
+ }
+
+ try {
+ objOut.writeObject(logEntry)
+ recordsWritten = true
+ } catch {
+ case e: Exception =>
+ logError(log"Failed to write log entry.", e)
+ throw e
+ }
+ }
+
+ def save(blockId: LogBlockId): Unit = {
+ if (hasBeenClosed) {
+ throw SparkException.internalError(
+ "Writer already closed. Cannot save.",
+ category = "STORAGE"
+ )
+ }
+
+ try {
+ if (blockId.logBlockType != logBlockType) {
+ throw SparkException.internalError(
+ s"LogBlockWriter is for $logBlockType, but got blockId $blockId")
+ }
+
+ objOut.flush()
+ objOut.close()
+ objOut = null
+
+ if(recordsWritten) {
+ totalBytesWritten = cos.getCount
+ // Save log block to BlockManager and delete the tmpFile.
+ val success = saveToBlockManager(blockId, totalBytesWritten)
+ if (!success) {
+ throw SparkException.internalError(s"Failed to save log block
$blockId to BlockManager")
+ }
+ }
+ } finally {
+ close()
+ }
+ }
+
+ def close(): Unit = {
+ if (hasBeenClosed) {
+ return
+ }
+
+ try {
+ if (objOut != null) {
+ objOut.close()
+ }
+ if (tmpFile != null && tmpFile.exists()) {
+ tmpFile.delete()
+ }
+ } catch {
+ case e: Exception =>
+ logWarning(log"Failed to close resources of LogBlockWriter", e)
+ } finally {
+ objOut = null
+ cos = null
+ hasBeenClosed = true
+ }
+ }
+
+ // For test only.
+ private[storage] def flush(): Unit = {
+ if (objOut != null) {
+ objOut.flush()
+ }
+ }
+
+ private[storage] def saveToBlockManager(blockId: LogBlockId, blockSize:
Long): Boolean = {
+ blockManager.
+ TempFileBasedBlockStoreUpdater(
+ blockId,
+ StorageLevel.DISK_ONLY,
+ LogLine.getClassTag(logBlockType),
+ tmpFile,
+ blockSize)
+ .save()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/LogLine.scala
b/core/src/main/scala/org/apache/spark/storage/LogLine.scala
new file mode 100644
index 000000000000..dc646f289e37
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/LogLine.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.reflect.ClassTag
+import scala.reflect.classTag
+
+/**
+ * Base class representing a log line.
+ *
+ * @param eventTime timestamp in milliseconds when the log is written
+ * @param sequenceId sequence ID of the log line
+ * @param message log message
+ */
+trait LogLine {
+ val eventTime: Long
+ val sequenceId: Long
+ val message: String
+}
+
+object LogLine {
+ def getClassTag(logBlockType: LogBlockType.LogBlockType):
ClassTag[_<:LogLine] =
+ logBlockType match {
+ case LogBlockType.TEST =>
+ classTag[TestLogLine]
+ case unsupportedLogBlockType =>
+ throw new RuntimeException("Not supported log type " +
unsupportedLogBlockType)
+ }
+}
+
+case class TestLogLine(eventTime: Long, sequenceId: Long, message: String)
+ extends LogLine {
+}
diff --git
a/core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala
b/core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala
new file mode 100644
index 000000000000..d925d7b79f46
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKeys.{BLOCK_ID, BYTE_SIZE}
+import org.apache.spark.storage.LogBlockType.LogBlockType
+
+/**
+ * Rolling log writer that writes log entries to blocks in a rolling manner.
Here we split
+ * log blocks based on size limit.
+ *
+ * @param blockManager BlockManager to manage log blocks.
+ * @param blockIdGenerator BlockId generator to generate unique block IDs for
log blocks.
+ * @param rollingSize Size limit for each log block. Default is 32MB (33554432
bytes).
+ */
+private[spark] class RollingLogWriter(
+ blockManager: BlockManager,
+ blockIdGenerator: LogBlockIdGenerator,
+ rollingSize: Long = 33554432L) extends Logging {
+ private var currentBlockWriter: Option[LogBlockWriter] = None
+ private var lastLogTime: Long = 0L
+ private val logBlockType: LogBlockType = blockIdGenerator.logBlockType
+
+ private def shouldRollOver: Boolean = {
+ currentBlockWriter match {
+ case Some(writer) => writer.bytesWritten() >= rollingSize
+ case None => false
+ }
+ }
+
+ /**
+ * Write a log entry. If the current block writer is empty, it will create a
new one.
+ * If the current block exceeds the rolling size, it will roll over to a new
block for
+ * the next log entry.
+ *
+ * @param logEntry log entry to write.
+ * @param removeBlockOnException if true, current log block will be deleted
without saving to
+ * BlockManager. Otherwise, not action will be
taken on current
+ * block which might be corrupted.
+ */
+ def writeLog(logEntry: LogLine, removeBlockOnException: Boolean = false):
Unit = {
+ // Create a new log writer if it's empty
+ if (currentBlockWriter.isEmpty) {
+ currentBlockWriter = Some(blockManager.getLogBlockWriter(logBlockType))
+ }
+
+ try {
+ currentBlockWriter.foreach { writer =>
+ writer.writeLog(logEntry)
+ lastLogTime = logEntry.eventTime
+ }
+ } catch {
+ case e: Exception =>
+ if (removeBlockOnException) {
+ logError(log"Failed to write log, closing block without saving.", e)
+ currentBlockWriter.foreach(_.close())
+ currentBlockWriter = None
+ }
+
+ throw e
+ }
+
+ if (shouldRollOver) {
+ rollOver()
+ }
+ }
+
+ def rollOver(): Unit = {
+ // Save current block and reset the writer
+ try {
+ saveCurrentBlock()
+ } finally {
+ currentBlockWriter = None
+ }
+ }
+
+ def close(): Unit = {
+ try {
+ saveCurrentBlock()
+ } finally {
+ currentBlockWriter = None
+ lastLogTime = 0L
+ }
+ }
+
+ // For test purpose.
+ private[storage] def flush(): Unit = {
+ currentBlockWriter.foreach(_.flush())
+ }
+
+ private def saveCurrentBlock(): Unit = {
+ currentBlockWriter.foreach { writer =>
+ val blockId = blockIdGenerator.nextBlockId(lastLogTime,
blockManager.executorId)
+ logInfo(log"Saving log block ${MDC(BLOCK_ID, blockId)} with " +
+ log"approximate size: ${MDC(BYTE_SIZE, writer.bytesWritten())} bytes.")
+ writer.save(blockId)
+ }
+ }
+}
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 5b86345dd5f9..95a315a486ff 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -64,6 +64,8 @@ import org.apache.spark.serializer.{DeserializationStream,
JavaSerializer, KryoD
import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo,
ShuffleBlockResolver, ShuffleManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.storage.LogBlockType.LogBlockType
+import org.apache.spark.storage.StorageLevel._
import org.apache.spark.util._
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.collection.Utils.createArray
@@ -2504,6 +2506,83 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with PrivateMethodTe
assert(acc.value === 6)
}
+ test("SPARK-53755: LogBlock should be DISK_ONLY") {
+ val store = makeBlockManager(8000, "executor1")
+ val data = Seq("log line 1", "log line 2")
+ val logBlockId = TestLogBlockId(1234L, store.executorId)
+
+ Seq(DISK_ONLY_2, DISK_ONLY_3,
+ MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER,
+ MEMORY_ONLY_SER_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2,
+ MEMORY_AND_DISK_SER, MEMORY_AND_DISK_SER_2, OFF_HEAP).foreach { level =>
+ val exception = intercept[SparkException] {
+ store.putIterator[String](logBlockId, data.iterator, level, tellMaster
= true)
+ }
+ assert(exception.getMessage.contains("Log blocks must be stored with
DISK_ONLY."))
+ }
+
+ assert(store.putIterator[String](logBlockId, data.iterator, DISK_ONLY,
tellMaster = true))
+ }
+
+ test("SPARK-53755: Log block write/read") {
+ val store = makeBlockManager(8000, "executor1")
+ val logBlockWriter = store.getLogBlockWriter(LogBlockType.TEST)
+ val logBlockId = TestLogBlockId(1L, store.executorId)
+ val log1 = TestLogLine(0L, 1, "Log message 1")
+ val log2 = TestLogLine(1L, 2, "Log message 2")
+
+ logBlockWriter.writeLog(log1)
+ logBlockWriter.writeLog(log2)
+ logBlockWriter.save(logBlockId)
+
+ val status = store.getStatus(logBlockId)
+ assert(status.isDefined)
+ status.foreach { s =>
+ assert(s.storageLevel === DISK_ONLY)
+ assert(s.memSize === 0)
+ assert(s.diskSize > 0)
+ }
+
+ val data = store.get[TestLogLine](logBlockId).get.data.toSeq
+ assert(data === Seq(log1, log2))
+ }
+
+ test("SPARK-53755: rolling log block write/read") {
+ val store = makeBlockManager(8000, "executor1")
+
+ val logBlockIdGenerator = new LogBlockIdGenerator {
+ override def logBlockType: LogBlockType = LogBlockType.TEST
+
+ override protected def genUniqueBlockId(
+ lastLogTime: Long, executorId: String): LogBlockId = {
+ TestLogBlockId(lastLogTime, executorId)
+ }
+ }
+
+ val logBlockWriter = store.getRollingLogWriter(logBlockIdGenerator, 100)
+ val log1 = TestLogLine(0L, 1, "Log message 1")
+ val log2 = TestLogLine(1L, 2, "Log message 2")
+ val log3 = TestLogLine(2L, 3, "Log message 3")
+ val log4 = TestLogLine(3L, 4, "Log message 4")
+
+ // 65 bytes for each log line, 2 log lines for each block
+ logBlockWriter.writeLog(log1)
+ logBlockWriter.writeLog(log2)
+ // Flush and update bytes written, so that the next write will go to a new
block.
+ logBlockWriter.flush()
+ logBlockWriter.writeLog(log3)
+ logBlockWriter.writeLog(log4)
+ logBlockWriter.close()
+
+ val logBlockId1 = TestLogBlockId(2L, store.executorId)
+ val logBlockId2 = TestLogBlockId(3L, store.executorId)
+ val logBlockIds = store
+ .getMatchingBlockIds(_.isInstanceOf[TestLogBlockId])
+ .distinct
+ assert(logBlockIds.size === 2)
+ assert(logBlockIds.contains(logBlockId1) &&
logBlockIds.contains(logBlockId2))
+ }
+
private def createKryoSerializerWithDiskCorruptedInputStream():
KryoSerializer = {
class TestDiskCorruptedInputStream extends InputStream {
override def read(): Int = throw new IOException("Input/output error")
diff --git
a/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala
b/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala
new file mode 100644
index 000000000000..2e06e5d150d5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.storage
+
+import java.io.File
+import java.nio.file.Files
+
+import org.mockito.ArgumentMatchers.{any, anyLong}
+import org.mockito.Mockito.{doAnswer, doThrow, mock, spy, times, verify}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
+import org.apache.spark.util.Utils
+
+class LogBlockWriterSuite extends SparkFunSuite {
+ var tempDir: File = _
+ var sparkConf: SparkConf = _
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ tempDir = Utils.createTempDir()
+ sparkConf = new SparkConf(false)
+ .set("spark.local.dir", tempDir.getAbsolutePath)
+ }
+
+ override def afterEach(): Unit = {
+ try {
+ Utils.deleteRecursively(tempDir)
+ } finally {
+ Utils.clearLocalRootDirs()
+ super.afterEach()
+ }
+ }
+
+ test("SPARK-53755: close resources when failed to initialize") {
+ val blockManager = mock(classOf[BlockManager])
+ val serializerManager =
+ spy(new SerializerManager(new JavaSerializer(sparkConf), sparkConf,
None))
+ doAnswer(_ => serializerManager).when(blockManager).serializerManager
+ doThrow(new RuntimeException("Initialization failed"))
+ .when(serializerManager).blockSerializationStream(any, any)(any)
+
+ intercept[RuntimeException] {
+ new LogBlockWriter(
+ blockManager, LogBlockType.TEST, sparkConf)
+ }
+ verify(serializerManager, times(1)).blockSerializationStream(any, any)(any)
+ val leafFiles = Files.walk(tempDir.toPath)
+ .filter(Files.isRegularFile(_))
+ .toArray
+ assert(leafFiles.isEmpty, "Temporary file should be deleted.")
+ }
+
+ test("SPARK-53755: bytes written stats") {
+ val logBlockWriter = makeLogBlockWriter()
+
+ val log1 = TestLogLine(0L, 1, "Log message 1")
+ val log2 = TestLogLine(1L, 2, "Log message 2")
+ try {
+ logBlockWriter.writeLog(log1)
+ logBlockWriter.writeLog(log2)
+ logBlockWriter.flush()
+ assert(logBlockWriter.bytesWritten() === logBlockWriter.tmpFile.length())
+ } finally {
+ logBlockWriter.close()
+ }
+ }
+
+ test("SPARK-53755: writeLog/save operations should fail on closed
LogBlockWriter") {
+ val logBlockWriter = makeLogBlockWriter()
+ val log = TestLogLine(0L, 1, "Log message 1")
+
+ logBlockWriter.writeLog(log)
+ logBlockWriter.close()
+
+ val exception1 = intercept[SparkException] {
+ logBlockWriter.writeLog(log)
+ }
+ assert(exception1.getMessage.contains("Writer already closed. Cannot write
more data."))
+
+ val exception2 = intercept[SparkException] {
+ logBlockWriter.save(TestLogBlockId(0L, "1"))
+ }
+ assert(exception2.getMessage.contains("Writer already closed. Cannot
save."))
+ }
+
+ test("SPARK-53755: close writer after saving to block manager") {
+ val log = TestLogLine(0L, 1, "Log message 1")
+
+ Seq(true, false).foreach { success =>
+ val logBlockWriter = spy(makeLogBlockWriter())
+ doAnswer(_ => success).when(logBlockWriter)
+ .saveToBlockManager(any[LogBlockId], anyLong)
+
+ logBlockWriter.writeLog(log)
+ if (success) {
+ logBlockWriter.save(TestLogBlockId(0L, "1"))
+ } else {
+ val exception = intercept[SparkException] {
+ logBlockWriter.save(TestLogBlockId(0L, "1"))
+ }
+ assert(exception.getMessage.contains("Failed to save log block"))
+ }
+
+ verify(logBlockWriter, times(1)).close()
+ }
+ }
+
+ test("SPARK-53755: skip saving to block manager if no logs written") {
+ val logBlockWriter = spy(makeLogBlockWriter())
+ logBlockWriter.save(TestLogBlockId(0L, "1"))
+ assert(logBlockWriter.bytesWritten() === 0L)
+ verify(logBlockWriter, times(0)).saveToBlockManager(any[LogBlockId],
anyLong)
+ }
+
+ private def makeLogBlockWriter(): LogBlockWriter = {
+ val serializerManager = new SerializerManager(new
JavaSerializer(sparkConf), sparkConf, None)
+ val blockManager = mock(classOf[BlockManager])
+ doAnswer(_ => serializerManager).when(blockManager).serializerManager
+ new LogBlockWriter(
+ blockManager, LogBlockType.TEST, sparkConf)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]