This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new ffbb1c12f47f [SPARK-51717][SS][ROCKSDB] Fix SST mismatch corruption 
that can happen for second snapshot created for a new query
ffbb1c12f47f is described below

commit ffbb1c12f47fc203897b9f000fec279a8606d03f
Author: micheal-o <micheal.okut...@gmail.com>
AuthorDate: Wed Apr 9 12:59:21 2025 +0900

    [SPARK-51717][SS][ROCKSDB] Fix SST mismatch corruption that can happen for 
second snapshot created for a new query
    
    ### What changes were proposed in this pull request?
    Fix error: Sst file size mismatch ... MANIFEST-000005 may be corrupted.
    
    This is an edge case in SST file reuse that can only happen for the first 
ever RocksDB checkpoint if the following conditions happen:
    
    1. The first ever RocksDB checkpoint (e.g. for version 10) was created with 
x.sst, but not yet upload by maintenance
    2. The next batch using RocksDB at v10 fails and rolls back store to -1 
(invalidates RocksDB)
    3. A new request to load RocksDB at v10 comes in, but v10 checkpoint is 
still not uploaded hence we have to start replaying changelog starting from 
checkpoint v0.
    4. We create a new v11 and new checkpoint with new x*.sst. v10 is now 
uploaded by maintenance. Then during upload of x*.sst for v11, we reuse x.sst 
DFS file, thinking it is the same as x*.sst.
    
    The problem here is from step 3, the way the file manager loads v0 is 
different from how it loads other versions. During the load of other versions, 
when we delete an existing local file we also delete it from file mapping. But 
for v0, file manager just deletes the local dir and we missed clearing the file 
mapping in this case. Hence the old x.sst was still showing in the file mapping 
at step 4. We need to fix this and also add additional size check.
    
    ### Why are the changes needed?
    Can cause checkpoint corruption, hence the query will fail.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    New test included
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50512 from micheal-o/file_reuse_bug_for_new_query.
    
    Authored-by: micheal-o <micheal.okut...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 3f16577c21998c2940aa2f13c21834290aa6ea29)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../scala/org/apache/spark/internal/LogKey.scala   |  1 +
 .../sql/execution/streaming/state/RocksDB.scala    | 46 +++++++++++++++++-----
 .../streaming/state/RocksDBFileManager.scala       |  6 +++
 .../execution/streaming/state/RocksDBSuite.scala   | 46 ++++++++++++++++++++++
 4 files changed, 90 insertions(+), 9 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index 251daf8a52c1..448061fbf1bb 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -703,6 +703,7 @@ private[spark] object LogKeys {
   case object RIGHT_EXPR extends LogKey
   case object RIGHT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey
   case object RMSE extends LogKey
+  case object ROCKS_DB_FILE_MAPPING extends LogKey
   case object ROCKS_DB_LOG_LEVEL extends LogKey
   case object ROCKS_DB_LOG_MESSAGE extends LogKey
   case object RPC_ADDRESS extends LogKey
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 6cd06e14f74f..cfabc6f5ffbc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1109,6 +1109,10 @@ class RocksDB(
 
       val (dfsFileSuffix, immutableFileMapping) = 
rocksDBFileMapping.createSnapshotFileMapping(
         fileManager, checkpointDir, version)
+      logInfo(log"RocksDB file mapping after creating snapshot file mapping 
for version " +
+        log"${MDC(LogKeys.VERSION_NUM, version)}:\n" +
+        log"${MDC(LogKeys.ROCKS_DB_FILE_MAPPING, rocksDBFileMapping)}")
+
       val newSnapshot = Some(RocksDBSnapshot(
         checkpointDir,
         version,
@@ -1574,6 +1578,16 @@ class RocksDBFileMapping {
   // from reusing SST files which have not been yet persisted to DFS,
   val snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo] = 
ConcurrentHashMap.newKeySet()
 
+  /**
+   * Clear everything stored in the file mapping.
+   */
+  def clear(): Unit = {
+    localFileMappings.clear()
+    snapshotsPendingUpload.clear()
+  }
+
+  override def toString: String = localFileMappings.toString()
+
   /**
    * Get the mapped DFS file for the given local file for a DFS load operation.
    * If the currently mapped DFS file was mapped in the same or newer version 
as the version we
@@ -1590,14 +1604,21 @@ class RocksDBFileMapping {
       fileManager: RocksDBFileManager,
       localFileName: String,
       versionToLoad: Long): Option[RocksDBImmutableFile] = {
-    getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToLoad)
+    getDfsFileWithIncompatibilityCheck(
+      fileManager,
+      localFileName,
+      // We can't reuse the current local file since it was added in the same 
or newer version
+      // as the version we want to load
+      (fileVersion, _) => fileVersion >= versionToLoad
+    )
   }
 
   /**
    * Get the mapped DFS file for the given local file for a DFS save (i.e. 
checkpoint) operation.
    * If the currently mapped DFS file was mapped in the same or newer version 
as the version we
-   * want to save (or was generated in a version which has not been uploaded 
to DFS yet),
-   * the mapped DFS file is ignored. In this scenario, the local mapping to 
this DFS file
+   * want to save (or was generated in a version which has not been uploaded 
to DFS yet)
+   * or the mapped dfs file isn't the same size as the local file,
+   * then the mapped DFS file is ignored. In this scenario, the local mapping 
to this DFS file
    * will be cleared, and function will return None.
    *
    * @note If the file was added in current version (i.e. versionToSave - 1), 
we can reuse it.
@@ -1608,19 +1629,26 @@ class RocksDBFileMapping {
    */
   private def getDfsFileForSave(
       fileManager: RocksDBFileManager,
-      localFileName: String,
+      localFile: File,
       versionToSave: Long): Option[RocksDBImmutableFile] = {
-    getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToSave)
+    getDfsFileWithIncompatibilityCheck(
+      fileManager,
+      localFile.getName,
+      (dfsFileVersion, dfsFile) =>
+        // The DFS file is not the same as the file we want to save, either if
+        // the DFS file was added in the same or higher version, or the file 
size is different
+        dfsFileVersion >= versionToSave || dfsFile.sizeBytes != 
localFile.length()
+    )
   }
 
-  private def getDfsFileWithVersionCheck(
+  private def getDfsFileWithIncompatibilityCheck(
       fileManager: RocksDBFileManager,
       localFileName: String,
-      isIncompatibleVersion: Long => Boolean): Option[RocksDBImmutableFile] = {
+      isIncompatible: (Long, RocksDBImmutableFile) => Boolean): 
Option[RocksDBImmutableFile] = {
     localFileMappings.get(localFileName).map { case (dfsFileMappedVersion, 
dfsFile) =>
       val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile)
       val versionSnapshotInfo = 
RocksDBVersionSnapshotInfo(dfsFileMappedVersion, dfsFileSuffix)
-      if (isIncompatibleVersion(dfsFileMappedVersion) ||
+      if (isIncompatible(dfsFileMappedVersion, dfsFile) ||
         snapshotsPendingUpload.contains(versionSnapshotInfo)) {
         // the mapped dfs file cannot be used, delete from mapping
         remove(localFileName)
@@ -1662,7 +1690,7 @@ class RocksDBFileMapping {
     val dfsFilesSuffix = UUID.randomUUID().toString
     val snapshotFileMapping = localImmutableFiles.map { f =>
       val localFileName = f.getName
-      val existingDfsFile = getDfsFileForSave(fileManager, localFileName, 
version)
+      val existingDfsFile = getDfsFileForSave(fileManager, f, version)
       val dfsFile = existingDfsFile.getOrElse {
         val newDfsFileName = fileManager.newDFSFileName(localFileName, 
dfsFilesSuffix)
         val newDfsFile = RocksDBImmutableFile(localFileName, newDfsFileName, 
sizeBytes = f.length())
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 71d6ec3d4da9..8ca5a2171aaa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -323,6 +323,8 @@ class RocksDBFileManager(
     val metadata = if (version == 0) {
       if (localDir.exists) Utils.deleteRecursively(localDir)
       localDir.mkdirs()
+      // Since we cleared the local dir, we should also clear the local file 
mapping
+      rocksDBFileMapping.clear()
       RocksDBCheckpointMetadata(Seq.empty, 0)
     } else {
       // Delete all non-immutable files in local dir, and unzip new ones from 
DFS commit file
@@ -341,6 +343,10 @@ class RocksDBFileManager(
     }
     logFilesInDir(localDir, log"Loaded checkpoint files " +
       log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
+    logInfo(log"RocksDB file mapping after loading checkpoint version " +
+      log"${MDC(LogKeys.VERSION_NUM, version)} from DFS:\n" +
+      log"${MDC(LogKeys.ROCKS_DB_FILE_MAPPING, rocksDBFileMapping)}")
+
     metadata
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index b5a2ec43001a..f9b58fe31578 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -3211,6 +3211,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }
   }
 
+  testWithChangelogCheckpointingEnabled(
+    "SPARK-51717 - validate that RocksDB file mapping is cleared " +
+      "when we reload version 0 after we have created a snapshot to avoid SST 
mismatch") {
+    withTempDir { dir =>
+      val conf = dbConf.copy(minDeltasForSnapshot = 2)
+      val hadoopConf = new Configuration()
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
+        db.load(0)
+        db.put("a", "1")
+        db.put("b", "1")
+        db.commit()
+
+        db.load(1)
+        db.put("a", "1")
+        db.commit() // we will create a snapshot for v2
+
+        // invalidate the db, so next load will reload from dfs
+        db.rollback()
+
+        // We will replay changelog from 0 -> 2 since the v2 snapshot haven't 
been uploaded yet.
+        // We had a bug where file mapping is not being cleared when we start 
from v0 again,
+        // hence files of v2 snapshot were being reused, if v2 snapshot is 
uploaded
+        // after this load(2) but before v3 snapshot
+        db.load(2)
+        // add a larger row to make sure new sst size is different
+        db.put("b", "1555315569874537247638950872648")
+
+        // now upload v2 snapshot
+        db.doMaintenance()
+
+        // we will create a snapshot for v3. We shouldn't reuse files of v2 
snapshot,
+        // given that v3 was not created from v2 snapshot since we replayed 
changelog from 0 -> 2
+        db.commit()
+
+        db.doMaintenance() // upload v3 snapshot
+
+        // invalidate the db, so next load will reload from dfs
+        db.rollback()
+
+        // loading v3 from dfs should be successful and no SST mismatch error
+        db.load(3)
+      }
+    }
+  }
+
   test("ensure local files deleted on filesystem" +
     " are cleaned from dfs file mapping") {
     def getSSTFiles(dir: File): Set[File] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to