This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 94cccad9e3f2 [SPARK-53755][CORE] Add log support in BlockManager
94cccad9e3f2 is described below

commit 94cccad9e3f226e841329490bdfbf73601880541
Author: Tengfei Huang <[email protected]>
AuthorDate: Mon Oct 20 17:54:19 2025 -0700

    [SPARK-53755][CORE] Add log support in BlockManager
    
    ### What changes were proposed in this pull request?
    
    Add log support in `BlockManager` to help collecting and analyzing logs for 
live debug. A few new components have been introduced to do that.
    
    1. `LogBlockId` to represent log data, which can be used for log data 
filter. And `LogBlockType` to identify different kinds of logs.
    2. `LogLine` to define the log structure.
    3. `LogBlockWriter` to help to write a single log block.
    4. `RollingLogWriter` to help to write logs in a rolling manner, split 
blocks by size(approximately).
    
    Usage Example:
    ```
    // Define a blockId generator which can help to generate unique block ids.
    val logBlockIdGenerator = new LogBlockIdGenerator {
        override def logBlockType: LogBlockType = LogBlockType.TEST
    
        override protected def genUniqueBlockId(
            lastLogTime: Long, executorId: String): LogBlockId = {
          TestLogBlockId(lastLogTime, executorId)
        }
    }
    
    // Get a log writer and write logs.
    val logBlockWriter = store.getRollingLogWriter[LogLine](logBlockIdGenerator)
    logBlockWriter.writeLog(LogLine(0L, 1, "Log message 1"))
    logBlockWriter.writeLog(LogLine(1L, 2, "Log message 2"))
    logBlockWriter.writeLog(LogLine(2L, 3, "Log message 3"))
    logBlockWriter.writeLog(LogLine(3L, 4, "Log message 4"))
    
    // Close writer after writing all logs.
    logBlockWriter.close()
    ```
    
    ### Why are the changes needed?
    Collect and analyze logs with spark jobs to make live debug easier.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52643 from ivoson/SPARK-53755.
    
    Authored-by: Tengfei Huang <[email protected]>
    Signed-off-by: Takuya Ueshin <[email protected]>
---
 .../spark/serializer/SerializerManager.scala       |  15 +-
 .../scala/org/apache/spark/storage/BlockId.scala   |  40 +++++
 .../org/apache/spark/storage/BlockManager.scala    |  32 +++-
 .../apache/spark/storage/LogBlockIdGenerator.scala |  49 ++++++
 .../org/apache/spark/storage/LogBlockWriter.scala  | 182 +++++++++++++++++++++
 .../scala/org/apache/spark/storage/LogLine.scala   |  48 ++++++
 .../apache/spark/storage/RollingLogWriter.scala    | 115 +++++++++++++
 .../apache/spark/storage/BlockManagerSuite.scala   |  79 +++++++++
 .../apache/spark/storage/LogBlockWriterSuite.scala | 137 ++++++++++++++++
 9 files changed, 691 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 640396a69526..d53a4d549782 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -172,9 +172,8 @@ private[spark] class SerializerManager(
       outputStream: OutputStream,
       values: Iterator[T]): Unit = {
     val byteStream = new BufferedOutputStream(outputStream)
-    val autoPick = !blockId.isInstanceOf[StreamBlockId]
-    val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
-    ser.serializeStream(wrapForCompression(blockId, 
byteStream)).writeAll(values).close()
+    blockSerializationStream[T](blockId, byteStream)(implicitly[ClassTag[T]])
+      .writeAll(values).close()
   }
 
   /** Serializes into a chunked byte buffer. */
@@ -212,4 +211,14 @@ private[spark] class SerializerManager(
       .deserializeStream(wrapForCompression(blockId, stream))
       .asIterator.asInstanceOf[Iterator[T]]
   }
+
+  /** Generate a `SerializationStream` for a block. */
+  private[spark] def blockSerializationStream[T](
+      blockId: BlockId,
+      outputStream: OutputStream)
+      (classTag: ClassTag[T]): SerializationStream = {
+    val autoPick = !blockId.isInstanceOf[StreamBlockId]
+    val ser = getSerializer(classTag, autoPick).newInstance()
+    ser.serializeStream(wrapForCompression(blockId, outputStream))
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 6eb015d56b2c..a15426783ebe 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -23,6 +23,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.annotation.{DeveloperApi, Since}
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.network.shuffle.RemoteBlockPushResolver
+import org.apache.spark.storage.LogBlockType.LogBlockType
 
 /**
  * :: DeveloperApi ::
@@ -175,6 +176,42 @@ case class PythonStreamBlockId(streamId: Int, uniqueId: 
Long) extends BlockId {
   override def name: String = "python-stream-" + streamId + "-" + uniqueId
 }
 
+object LogBlockType extends Enumeration {
+  type LogBlockType = Value
+  val TEST = Value
+}
+
+/**
+ * Identifies a block of log data.
+ *
+ * @param lastLogTime the timestamp of the last log entry in this block, used 
for filtering
+ *                    and log management.
+ * @param executorId the ID of the executor that produced this log block.
+ */
+abstract sealed class LogBlockId(
+    val lastLogTime: Long,
+    val executorId: String) extends BlockId {
+  def logBlockType: LogBlockType
+}
+
+object LogBlockId {
+  def empty(logBlockType: LogBlockType): LogBlockId = {
+    logBlockType match {
+      case LogBlockType.TEST => TestLogBlockId(0L, "")
+      case _ => throw new SparkException(s"Unsupported log block type: 
$logBlockType")
+    }
+  }
+}
+
+// Used for test purpose only.
+case class TestLogBlockId(override val lastLogTime: Long, override val 
executorId: String)
+  extends LogBlockId(lastLogTime, executorId) {
+  override def name: String =
+    "test_log_" + lastLogTime + "_" + executorId
+
+  override def logBlockType: LogBlockType = LogBlockType.TEST
+}
+
 /** Id associated with temporary local data managed as blocks. Not 
serializable. */
 private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
   override def name: String = "temp_local_" + id
@@ -222,6 +259,7 @@ object BlockId {
   val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
   val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
   val TEST = "test_(.*)".r
+  val TEST_LOG_BLOCK = "test_log_([0-9]+)_(.*)".r
 
   def apply(name: String): BlockId = name match {
     case RDD(rddId, splitIndex) =>
@@ -262,6 +300,8 @@ object BlockId {
       TempLocalBlockId(UUID.fromString(uuid))
     case TEMP_SHUFFLE(uuid) =>
       TempShuffleBlockId(UUID.fromString(uuid))
+    case TEST_LOG_BLOCK(lastLogTime, executorId) =>
+      TestLogBlockId(lastLogTime.toLong, executorId)
     case TEST(value) =>
       TestBlockId(value)
     case _ => throw SparkCoreErrors.unrecognizedBlockIdError(name)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1e1cb8bf9fd5..602e8d068fb6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -58,6 +58,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
 import org.apache.spark.shuffle.{IndexShuffleBlockResolver, 
MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter}
 import 
org.apache.spark.storage.BlockManagerMessages.{DecommissionBlockManager, 
ReplicateBlock}
+import org.apache.spark.storage.LogBlockType.LogBlockType
 import org.apache.spark.storage.memory._
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util._
@@ -294,12 +295,17 @@ private[spark] class BlockManager(
     decommissioner.isDefined
   }
 
-  @inline final private def checkShouldStore(blockId: BlockId) = {
+  @inline final private def checkShouldStore(blockId: BlockId, level: 
StorageLevel) = {
     // Don't reject broadcast blocks since they may be stored during task exec 
and
     // don't need to be migrated.
     if (isDecommissioning() && !blockId.isBroadcast) {
       throw 
SparkCoreErrors.cannotSaveBlockOnDecommissionedExecutorError(blockId)
     }
+    if (blockId.isInstanceOf[LogBlockId] && level != StorageLevel.DISK_ONLY) {
+      throw SparkException.internalError(
+        s"Cannot store log block $blockId with storage level $level. " +
+          "Log blocks must be stored with DISK_ONLY.")
+    }
   }
 
   // This is a lazy val so someone can migrating RDDs even if they don't have 
a MigratableResolver
@@ -763,7 +769,7 @@ private[spark] class BlockManager(
       level: StorageLevel,
       classTag: ClassTag[_]): StreamCallbackWithID = {
 
-    checkShouldStore(blockId)
+    checkShouldStore(blockId, level)
 
     if (blockId.isShuffle) {
       logDebug(s"Putting shuffle block ${blockId}")
@@ -1483,6 +1489,26 @@ private[spark] class BlockManager(
       syncWrites, writeMetrics, blockId)
   }
 
+  /**
+   * To get a log block writer that can write logs directly to a disk block. 
Either `save` or
+   * `close` should be called to finish the writing and release opened 
resources.
+   * `save` would write the block to the block manager, while `close` would 
just close the writer.
+   */
+  def getLogBlockWriter(
+      logBlockType: LogBlockType): LogBlockWriter = {
+    new LogBlockWriter(this, logBlockType, conf)
+  }
+
+  /**
+   * To get a rolling log writer that can write logs to block manager and 
split the logs
+   * to multiple blocks if the log size exceeds the threshold.
+   */
+  def getRollingLogWriter(
+      blockIdGenerator: LogBlockIdGenerator,
+      rollingSize: Long = 33554432L): RollingLogWriter = {
+    new RollingLogWriter(this, blockIdGenerator, rollingSize)
+  }
+
   /**
    * Put a new block of serialized bytes to the block manager.
    *
@@ -1540,7 +1566,7 @@ private[spark] class BlockManager(
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
-    checkShouldStore(blockId)
+    checkShouldStore(blockId, level)
 
     val putBlockInfo = {
       val newInfo = new BlockInfo(level, classTag, tellMaster)
diff --git 
a/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala 
b/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala
new file mode 100644
index 000000000000..4a2b90677ba3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.SparkException
+import org.apache.spark.storage.LogBlockType.LogBlockType
+
+/**
+ * LogBlockIdGenerator is responsible for generating unique LogBlockIds for 
log blocks.
+ */
+trait LogBlockIdGenerator {
+  // The log block type that this generator supports.
+  def logBlockType: LogBlockType
+
+  // Generates a unique LogBlockId based on the last log time and executor ID.
+  protected def genUniqueBlockId(lastLogTime: Long, executorId: String): 
LogBlockId
+
+  /**
+   * Generates a new LogBlockId based on the last log time and executor ID. 
Make sure that
+   * the generated LogBlockId has the same log block type as this generator.
+   *
+   * @param lastLogTime The timestamp of the last log entry.
+   * @param executorId The ID of the executor generating the log block.
+   */
+  final def nextBlockId(lastLogTime: Long, executorId: String): LogBlockId = {
+    val blockId = genUniqueBlockId(lastLogTime, executorId)
+    if (blockId.logBlockType != this.logBlockType) {
+      throw SparkException.internalError(
+        "BlockId generated by LogBlockIdGenerator does not match " +
+        s"the expected log block type: ${blockId.logBlockType} != 
${this.logBlockType}")
+    }
+    blockId
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala
new file mode 100644
index 000000000000..5645f59d383d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.BufferedOutputStream
+import java.io.File
+import java.io.FileOutputStream
+
+import org.apache.commons.io.output.CountingOutputStream
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.SerializationStream
+import org.apache.spark.storage.LogBlockType.LogBlockType
+import org.apache.spark.util.Utils
+
+/**
+ * A class for writing logs directly to a file on disk and save as a block in 
BlockManager if
+ * there are any logs written.
+ * `save` or `close` must be called to ensure resources are released properly. 
`save` will add
+ * the log block to BlockManager, while `close` will just release the 
resources without saving
+ * the log block.
+ *
+ * Notes:
+ * - This class does not support concurrent writes.
+ * - The writer will be automatically closed when failed to write logs or 
failed to save the
+ *   log block.
+ * - Write operations after closing will throw exceptions.
+ */
+private[spark] class LogBlockWriter(
+    blockManager: BlockManager,
+    logBlockType: LogBlockType,
+    sparkConf: SparkConf,
+    bufferSize: Int = 32 * 1024) extends Logging {
+
+  private[storage] var tmpFile: File = null
+
+  private var cos: CountingOutputStream = null
+  private var objOut: SerializationStream = null
+  private var hasBeenClosed = false
+  private var recordsWritten = false
+  private var totalBytesWritten = 0
+
+  initialize()
+
+  private def initialize(): Unit = {
+    try {
+      val dir = new File(Utils.getLocalDir(sparkConf))
+      tmpFile = File.createTempFile(s"spark_log_$logBlockType", "", dir)
+      val fos = new FileOutputStream(tmpFile, false)
+      val bos = new BufferedOutputStream(fos, bufferSize)
+      cos = new CountingOutputStream(bos)
+      val emptyBlockId = LogBlockId.empty(logBlockType)
+      objOut = blockManager
+        .serializerManager
+        .blockSerializationStream(emptyBlockId, 
cos)(LogLine.getClassTag(logBlockType))
+    } catch {
+      case e: Exception =>
+        logError(log"Failed to initialize LogBlockWriter.", e)
+        close()
+        throw e
+    }
+  }
+
+  def bytesWritten(): Int = {
+    Option(cos)
+      .map(_.getCount)
+      .getOrElse(totalBytesWritten)
+  }
+
+  /**
+   * Write a log entry to the log block. Exception will be thrown if the 
writer has been closed
+   * or if there is an error during writing. Caller needs to deal with the 
exception. Suggest to
+   * close the writer when exception is thrown as block data could be 
corrupted which would lead
+   * to issues when reading the log block later.
+   *
+   * @param logEntry The log entry to write.
+   */
+  def writeLog(logEntry: LogLine): Unit = {
+    if (hasBeenClosed) {
+      throw SparkException.internalError(
+        "Writer already closed. Cannot write more data.",
+        category = "STORAGE"
+      )
+    }
+
+    try {
+      objOut.writeObject(logEntry)
+      recordsWritten = true
+    } catch {
+      case e: Exception =>
+        logError(log"Failed to write log entry.", e)
+        throw e
+    }
+  }
+
+  def save(blockId: LogBlockId): Unit = {
+    if (hasBeenClosed) {
+      throw SparkException.internalError(
+        "Writer already closed. Cannot save.",
+        category = "STORAGE"
+      )
+    }
+
+    try {
+      if (blockId.logBlockType != logBlockType) {
+        throw SparkException.internalError(
+          s"LogBlockWriter is for $logBlockType, but got blockId $blockId")
+      }
+
+      objOut.flush()
+      objOut.close()
+      objOut = null
+
+      if(recordsWritten) {
+        totalBytesWritten = cos.getCount
+        // Save log block to BlockManager and delete the tmpFile.
+        val success = saveToBlockManager(blockId, totalBytesWritten)
+        if (!success) {
+          throw SparkException.internalError(s"Failed to save log block 
$blockId to BlockManager")
+        }
+      }
+    } finally {
+      close()
+    }
+  }
+
+  def close(): Unit = {
+    if (hasBeenClosed) {
+      return
+    }
+
+    try {
+      if (objOut != null) {
+        objOut.close()
+      }
+      if (tmpFile != null && tmpFile.exists()) {
+        tmpFile.delete()
+      }
+    } catch {
+      case e: Exception =>
+        logWarning(log"Failed to close resources of LogBlockWriter", e)
+    } finally {
+      objOut = null
+      cos = null
+      hasBeenClosed = true
+    }
+  }
+
+  // For test only.
+  private[storage] def flush(): Unit = {
+    if (objOut != null) {
+      objOut.flush()
+    }
+  }
+
+  private[storage] def saveToBlockManager(blockId: LogBlockId, blockSize: 
Long): Boolean = {
+    blockManager.
+      TempFileBasedBlockStoreUpdater(
+        blockId,
+        StorageLevel.DISK_ONLY,
+        LogLine.getClassTag(logBlockType),
+        tmpFile,
+        blockSize)
+      .save()
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/LogLine.scala 
b/core/src/main/scala/org/apache/spark/storage/LogLine.scala
new file mode 100644
index 000000000000..dc646f289e37
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/LogLine.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.reflect.ClassTag
+import scala.reflect.classTag
+
+/**
+ * Base class representing a log line.
+ *
+ * @param eventTime timestamp in milliseconds when the log is written
+ * @param sequenceId sequence ID of the log line
+ * @param message log message
+ */
+trait LogLine {
+  val eventTime: Long
+  val sequenceId: Long
+  val message: String
+}
+
+object LogLine {
+  def getClassTag(logBlockType: LogBlockType.LogBlockType): 
ClassTag[_<:LogLine] =
+    logBlockType match {
+      case LogBlockType.TEST =>
+        classTag[TestLogLine]
+      case unsupportedLogBlockType =>
+        throw new RuntimeException("Not supported log type " + 
unsupportedLogBlockType)
+    }
+}
+
+case class TestLogLine(eventTime: Long, sequenceId: Long, message: String)
+  extends LogLine {
+}
diff --git 
a/core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala
new file mode 100644
index 000000000000..d925d7b79f46
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKeys.{BLOCK_ID, BYTE_SIZE}
+import org.apache.spark.storage.LogBlockType.LogBlockType
+
+/**
+ * Rolling log writer that writes log entries to blocks in a rolling manner. 
Here we split
+ * log blocks based on size limit.
+ *
+ * @param blockManager BlockManager to manage log blocks.
+ * @param blockIdGenerator BlockId generator to generate unique block IDs for 
log blocks.
+ * @param rollingSize Size limit for each log block. Default is 32MB (33554432 
bytes).
+ */
+private[spark] class RollingLogWriter(
+    blockManager: BlockManager,
+    blockIdGenerator: LogBlockIdGenerator,
+    rollingSize: Long = 33554432L) extends Logging {
+  private var currentBlockWriter: Option[LogBlockWriter] = None
+  private var lastLogTime: Long = 0L
+  private val logBlockType: LogBlockType = blockIdGenerator.logBlockType
+
+  private def shouldRollOver: Boolean = {
+    currentBlockWriter match {
+      case Some(writer) => writer.bytesWritten() >= rollingSize
+      case None => false
+    }
+  }
+
+  /**
+   * Write a log entry. If the current block writer is empty, it will create a 
new one.
+   * If the current block exceeds the rolling size, it will roll over to a new 
block for
+   * the next log entry.
+   *
+   * @param logEntry log entry to write.
+   * @param removeBlockOnException if true, current log block will be deleted 
without saving to
+   *                               BlockManager. Otherwise, not action will be 
taken on current
+   *                               block which might be corrupted.
+   */
+  def writeLog(logEntry: LogLine, removeBlockOnException: Boolean = false): 
Unit = {
+    // Create a new log writer if it's empty
+    if (currentBlockWriter.isEmpty) {
+      currentBlockWriter = Some(blockManager.getLogBlockWriter(logBlockType))
+    }
+
+    try {
+      currentBlockWriter.foreach { writer =>
+        writer.writeLog(logEntry)
+        lastLogTime = logEntry.eventTime
+      }
+    } catch {
+      case e: Exception =>
+        if (removeBlockOnException) {
+          logError(log"Failed to write log, closing block without saving.", e)
+          currentBlockWriter.foreach(_.close())
+          currentBlockWriter = None
+        }
+
+        throw e
+    }
+
+    if (shouldRollOver) {
+      rollOver()
+    }
+  }
+
+  def rollOver(): Unit = {
+    // Save current block and reset the writer
+    try {
+      saveCurrentBlock()
+    } finally {
+      currentBlockWriter = None
+    }
+  }
+
+  def close(): Unit = {
+    try {
+      saveCurrentBlock()
+    } finally {
+      currentBlockWriter = None
+      lastLogTime = 0L
+    }
+  }
+
+  // For test purpose.
+  private[storage] def flush(): Unit = {
+    currentBlockWriter.foreach(_.flush())
+  }
+
+  private def saveCurrentBlock(): Unit = {
+    currentBlockWriter.foreach { writer =>
+      val blockId = blockIdGenerator.nextBlockId(lastLogTime, 
blockManager.executorId)
+      logInfo(log"Saving log block ${MDC(BLOCK_ID, blockId)} with " +
+        log"approximate size: ${MDC(BYTE_SIZE, writer.bytesWritten())} bytes.")
+      writer.save(blockId)
+    }
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 5b86345dd5f9..95a315a486ff 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -64,6 +64,8 @@ import org.apache.spark.serializer.{DeserializationStream, 
JavaSerializer, KryoD
 import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo, 
ShuffleBlockResolver, ShuffleManager}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.storage.LogBlockType.LogBlockType
+import org.apache.spark.storage.StorageLevel._
 import org.apache.spark.util._
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.collection.Utils.createArray
@@ -2504,6 +2506,83 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
     assert(acc.value === 6)
   }
 
+  test("SPARK-53755: LogBlock should be DISK_ONLY") {
+    val store = makeBlockManager(8000, "executor1")
+    val data = Seq("log line 1", "log line 2")
+    val logBlockId = TestLogBlockId(1234L, store.executorId)
+
+    Seq(DISK_ONLY_2, DISK_ONLY_3,
+      MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER,
+      MEMORY_ONLY_SER_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2,
+      MEMORY_AND_DISK_SER, MEMORY_AND_DISK_SER_2, OFF_HEAP).foreach { level =>
+      val exception = intercept[SparkException] {
+        store.putIterator[String](logBlockId, data.iterator, level, tellMaster 
= true)
+      }
+      assert(exception.getMessage.contains("Log blocks must be stored with 
DISK_ONLY."))
+    }
+
+    assert(store.putIterator[String](logBlockId, data.iterator, DISK_ONLY, 
tellMaster = true))
+  }
+
+  test("SPARK-53755: Log block write/read") {
+    val store = makeBlockManager(8000, "executor1")
+    val logBlockWriter = store.getLogBlockWriter(LogBlockType.TEST)
+    val logBlockId = TestLogBlockId(1L, store.executorId)
+    val log1 = TestLogLine(0L, 1, "Log message 1")
+    val log2 = TestLogLine(1L, 2, "Log message 2")
+
+    logBlockWriter.writeLog(log1)
+    logBlockWriter.writeLog(log2)
+    logBlockWriter.save(logBlockId)
+
+    val status = store.getStatus(logBlockId)
+    assert(status.isDefined)
+    status.foreach { s =>
+      assert(s.storageLevel === DISK_ONLY)
+      assert(s.memSize === 0)
+      assert(s.diskSize > 0)
+    }
+
+    val data = store.get[TestLogLine](logBlockId).get.data.toSeq
+    assert(data === Seq(log1, log2))
+  }
+
+  test("SPARK-53755: rolling log block write/read") {
+    val store = makeBlockManager(8000, "executor1")
+
+    val logBlockIdGenerator = new LogBlockIdGenerator {
+      override def logBlockType: LogBlockType = LogBlockType.TEST
+
+      override protected def genUniqueBlockId(
+          lastLogTime: Long, executorId: String): LogBlockId = {
+        TestLogBlockId(lastLogTime, executorId)
+      }
+    }
+
+    val logBlockWriter = store.getRollingLogWriter(logBlockIdGenerator, 100)
+    val log1 = TestLogLine(0L, 1, "Log message 1")
+    val log2 = TestLogLine(1L, 2, "Log message 2")
+    val log3 = TestLogLine(2L, 3, "Log message 3")
+    val log4 = TestLogLine(3L, 4, "Log message 4")
+
+    // 65 bytes for each log line, 2 log lines for each block
+    logBlockWriter.writeLog(log1)
+    logBlockWriter.writeLog(log2)
+    // Flush and update bytes written, so that the next write will go to a new 
block.
+    logBlockWriter.flush()
+    logBlockWriter.writeLog(log3)
+    logBlockWriter.writeLog(log4)
+    logBlockWriter.close()
+
+    val logBlockId1 = TestLogBlockId(2L, store.executorId)
+    val logBlockId2 = TestLogBlockId(3L, store.executorId)
+    val logBlockIds = store
+      .getMatchingBlockIds(_.isInstanceOf[TestLogBlockId])
+      .distinct
+    assert(logBlockIds.size === 2)
+    assert(logBlockIds.contains(logBlockId1) && 
logBlockIds.contains(logBlockId2))
+  }
+
   private def createKryoSerializerWithDiskCorruptedInputStream(): 
KryoSerializer = {
     class TestDiskCorruptedInputStream extends InputStream {
       override def read(): Int = throw new IOException("Input/output error")
diff --git 
a/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala
new file mode 100644
index 000000000000..2e06e5d150d5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.storage
+
+import java.io.File
+import java.nio.file.Files
+
+import org.mockito.ArgumentMatchers.{any, anyLong}
+import org.mockito.Mockito.{doAnswer, doThrow, mock, spy, times, verify}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
+import org.apache.spark.util.Utils
+
+class LogBlockWriterSuite extends SparkFunSuite {
+  var tempDir: File = _
+  var sparkConf: SparkConf = _
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    tempDir = Utils.createTempDir()
+    sparkConf = new SparkConf(false)
+      .set("spark.local.dir", tempDir.getAbsolutePath)
+  }
+
+  override def afterEach(): Unit = {
+    try {
+      Utils.deleteRecursively(tempDir)
+    } finally {
+      Utils.clearLocalRootDirs()
+      super.afterEach()
+    }
+  }
+
+  test("SPARK-53755: close resources when failed to initialize") {
+    val blockManager = mock(classOf[BlockManager])
+    val serializerManager =
+      spy(new SerializerManager(new JavaSerializer(sparkConf), sparkConf, 
None))
+    doAnswer(_ => serializerManager).when(blockManager).serializerManager
+    doThrow(new RuntimeException("Initialization failed"))
+      .when(serializerManager).blockSerializationStream(any, any)(any)
+
+    intercept[RuntimeException] {
+      new LogBlockWriter(
+        blockManager, LogBlockType.TEST, sparkConf)
+    }
+    verify(serializerManager, times(1)).blockSerializationStream(any, any)(any)
+    val leafFiles = Files.walk(tempDir.toPath)
+      .filter(Files.isRegularFile(_))
+      .toArray
+    assert(leafFiles.isEmpty, "Temporary file should be deleted.")
+  }
+
+  test("SPARK-53755: bytes written stats") {
+    val logBlockWriter = makeLogBlockWriter()
+
+    val log1 = TestLogLine(0L, 1, "Log message 1")
+    val log2 = TestLogLine(1L, 2, "Log message 2")
+    try {
+      logBlockWriter.writeLog(log1)
+      logBlockWriter.writeLog(log2)
+      logBlockWriter.flush()
+      assert(logBlockWriter.bytesWritten() === logBlockWriter.tmpFile.length())
+    } finally {
+      logBlockWriter.close()
+    }
+  }
+
+  test("SPARK-53755: writeLog/save operations should fail on closed 
LogBlockWriter") {
+    val logBlockWriter = makeLogBlockWriter()
+    val log = TestLogLine(0L, 1, "Log message 1")
+
+    logBlockWriter.writeLog(log)
+    logBlockWriter.close()
+
+    val exception1 = intercept[SparkException] {
+      logBlockWriter.writeLog(log)
+    }
+    assert(exception1.getMessage.contains("Writer already closed. Cannot write 
more data."))
+
+    val exception2 = intercept[SparkException] {
+      logBlockWriter.save(TestLogBlockId(0L, "1"))
+    }
+    assert(exception2.getMessage.contains("Writer already closed. Cannot 
save."))
+  }
+
+  test("SPARK-53755: close writer after saving to block manager") {
+    val log = TestLogLine(0L, 1, "Log message 1")
+
+    Seq(true, false).foreach { success =>
+      val logBlockWriter = spy(makeLogBlockWriter())
+      doAnswer(_ => success).when(logBlockWriter)
+        .saveToBlockManager(any[LogBlockId], anyLong)
+
+      logBlockWriter.writeLog(log)
+      if (success) {
+        logBlockWriter.save(TestLogBlockId(0L, "1"))
+      } else {
+        val exception = intercept[SparkException] {
+          logBlockWriter.save(TestLogBlockId(0L, "1"))
+        }
+        assert(exception.getMessage.contains("Failed to save log block"))
+      }
+
+      verify(logBlockWriter, times(1)).close()
+    }
+  }
+
+  test("SPARK-53755: skip saving to block manager if no logs written") {
+    val logBlockWriter = spy(makeLogBlockWriter())
+    logBlockWriter.save(TestLogBlockId(0L, "1"))
+    assert(logBlockWriter.bytesWritten() === 0L)
+    verify(logBlockWriter, times(0)).saveToBlockManager(any[LogBlockId], 
anyLong)
+  }
+
+  private def makeLogBlockWriter(): LogBlockWriter = {
+    val serializerManager = new SerializerManager(new 
JavaSerializer(sparkConf), sparkConf, None)
+    val blockManager = mock(classOf[BlockManager])
+    doAnswer(_ => serializerManager).when(blockManager).serializerManager
+    new LogBlockWriter(
+      blockManager, LogBlockType.TEST, sparkConf)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to