This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new ffbb1c12f47f [SPARK-51717][SS][ROCKSDB] Fix SST mismatch corruption that can happen for second snapshot created for a new query ffbb1c12f47f is described below commit ffbb1c12f47fc203897b9f000fec279a8606d03f Author: micheal-o <micheal.okut...@gmail.com> AuthorDate: Wed Apr 9 12:59:21 2025 +0900 [SPARK-51717][SS][ROCKSDB] Fix SST mismatch corruption that can happen for second snapshot created for a new query ### What changes were proposed in this pull request? Fix error: Sst file size mismatch ... MANIFEST-000005 may be corrupted. This is an edge case in SST file reuse that can only happen for the first ever RocksDB checkpoint if the following conditions happen: 1. The first ever RocksDB checkpoint (e.g. for version 10) was created with x.sst, but not yet upload by maintenance 2. The next batch using RocksDB at v10 fails and rolls back store to -1 (invalidates RocksDB) 3. A new request to load RocksDB at v10 comes in, but v10 checkpoint is still not uploaded hence we have to start replaying changelog starting from checkpoint v0. 4. We create a new v11 and new checkpoint with new x*.sst. v10 is now uploaded by maintenance. Then during upload of x*.sst for v11, we reuse x.sst DFS file, thinking it is the same as x*.sst. The problem here is from step 3, the way the file manager loads v0 is different from how it loads other versions. During the load of other versions, when we delete an existing local file we also delete it from file mapping. But for v0, file manager just deletes the local dir and we missed clearing the file mapping in this case. Hence the old x.sst was still showing in the file mapping at step 4. We need to fix this and also add additional size check. ### Why are the changes needed? Can cause checkpoint corruption, hence the query will fail. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test included ### Was this patch authored or co-authored using generative AI tooling? No Closes #50512 from micheal-o/file_reuse_bug_for_new_query. Authored-by: micheal-o <micheal.okut...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 3f16577c21998c2940aa2f13c21834290aa6ea29) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../scala/org/apache/spark/internal/LogKey.scala | 1 + .../sql/execution/streaming/state/RocksDB.scala | 46 +++++++++++++++++----- .../streaming/state/RocksDBFileManager.scala | 6 +++ .../execution/streaming/state/RocksDBSuite.scala | 46 ++++++++++++++++++++++ 4 files changed, 90 insertions(+), 9 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 251daf8a52c1..448061fbf1bb 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -703,6 +703,7 @@ private[spark] object LogKeys { case object RIGHT_EXPR extends LogKey case object RIGHT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey case object RMSE extends LogKey + case object ROCKS_DB_FILE_MAPPING extends LogKey case object ROCKS_DB_LOG_LEVEL extends LogKey case object ROCKS_DB_LOG_MESSAGE extends LogKey case object RPC_ADDRESS extends LogKey diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 6cd06e14f74f..cfabc6f5ffbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1109,6 +1109,10 @@ class RocksDB( val (dfsFileSuffix, immutableFileMapping) = rocksDBFileMapping.createSnapshotFileMapping( fileManager, checkpointDir, version) + logInfo(log"RocksDB file mapping after creating snapshot file mapping for version " + + log"${MDC(LogKeys.VERSION_NUM, version)}:\n" + + log"${MDC(LogKeys.ROCKS_DB_FILE_MAPPING, rocksDBFileMapping)}") + val newSnapshot = Some(RocksDBSnapshot( checkpointDir, version, @@ -1574,6 +1578,16 @@ class RocksDBFileMapping { // from reusing SST files which have not been yet persisted to DFS, val snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo] = ConcurrentHashMap.newKeySet() + /** + * Clear everything stored in the file mapping. + */ + def clear(): Unit = { + localFileMappings.clear() + snapshotsPendingUpload.clear() + } + + override def toString: String = localFileMappings.toString() + /** * Get the mapped DFS file for the given local file for a DFS load operation. * If the currently mapped DFS file was mapped in the same or newer version as the version we @@ -1590,14 +1604,21 @@ class RocksDBFileMapping { fileManager: RocksDBFileManager, localFileName: String, versionToLoad: Long): Option[RocksDBImmutableFile] = { - getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToLoad) + getDfsFileWithIncompatibilityCheck( + fileManager, + localFileName, + // We can't reuse the current local file since it was added in the same or newer version + // as the version we want to load + (fileVersion, _) => fileVersion >= versionToLoad + ) } /** * Get the mapped DFS file for the given local file for a DFS save (i.e. checkpoint) operation. * If the currently mapped DFS file was mapped in the same or newer version as the version we - * want to save (or was generated in a version which has not been uploaded to DFS yet), - * the mapped DFS file is ignored. In this scenario, the local mapping to this DFS file + * want to save (or was generated in a version which has not been uploaded to DFS yet) + * or the mapped dfs file isn't the same size as the local file, + * then the mapped DFS file is ignored. In this scenario, the local mapping to this DFS file * will be cleared, and function will return None. * * @note If the file was added in current version (i.e. versionToSave - 1), we can reuse it. @@ -1608,19 +1629,26 @@ class RocksDBFileMapping { */ private def getDfsFileForSave( fileManager: RocksDBFileManager, - localFileName: String, + localFile: File, versionToSave: Long): Option[RocksDBImmutableFile] = { - getDfsFileWithVersionCheck(fileManager, localFileName, _ >= versionToSave) + getDfsFileWithIncompatibilityCheck( + fileManager, + localFile.getName, + (dfsFileVersion, dfsFile) => + // The DFS file is not the same as the file we want to save, either if + // the DFS file was added in the same or higher version, or the file size is different + dfsFileVersion >= versionToSave || dfsFile.sizeBytes != localFile.length() + ) } - private def getDfsFileWithVersionCheck( + private def getDfsFileWithIncompatibilityCheck( fileManager: RocksDBFileManager, localFileName: String, - isIncompatibleVersion: Long => Boolean): Option[RocksDBImmutableFile] = { + isIncompatible: (Long, RocksDBImmutableFile) => Boolean): Option[RocksDBImmutableFile] = { localFileMappings.get(localFileName).map { case (dfsFileMappedVersion, dfsFile) => val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile) val versionSnapshotInfo = RocksDBVersionSnapshotInfo(dfsFileMappedVersion, dfsFileSuffix) - if (isIncompatibleVersion(dfsFileMappedVersion) || + if (isIncompatible(dfsFileMappedVersion, dfsFile) || snapshotsPendingUpload.contains(versionSnapshotInfo)) { // the mapped dfs file cannot be used, delete from mapping remove(localFileName) @@ -1662,7 +1690,7 @@ class RocksDBFileMapping { val dfsFilesSuffix = UUID.randomUUID().toString val snapshotFileMapping = localImmutableFiles.map { f => val localFileName = f.getName - val existingDfsFile = getDfsFileForSave(fileManager, localFileName, version) + val existingDfsFile = getDfsFileForSave(fileManager, f, version) val dfsFile = existingDfsFile.getOrElse { val newDfsFileName = fileManager.newDFSFileName(localFileName, dfsFilesSuffix) val newDfsFile = RocksDBImmutableFile(localFileName, newDfsFileName, sizeBytes = f.length()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 71d6ec3d4da9..8ca5a2171aaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -323,6 +323,8 @@ class RocksDBFileManager( val metadata = if (version == 0) { if (localDir.exists) Utils.deleteRecursively(localDir) localDir.mkdirs() + // Since we cleared the local dir, we should also clear the local file mapping + rocksDBFileMapping.clear() RocksDBCheckpointMetadata(Seq.empty, 0) } else { // Delete all non-immutable files in local dir, and unzip new ones from DFS commit file @@ -341,6 +343,10 @@ class RocksDBFileManager( } logFilesInDir(localDir, log"Loaded checkpoint files " + log"for version ${MDC(LogKeys.VERSION_NUM, version)}") + logInfo(log"RocksDB file mapping after loading checkpoint version " + + log"${MDC(LogKeys.VERSION_NUM, version)} from DFS:\n" + + log"${MDC(LogKeys.ROCKS_DB_FILE_MAPPING, rocksDBFileMapping)}") + metadata } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index b5a2ec43001a..f9b58fe31578 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3211,6 +3211,52 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + testWithChangelogCheckpointingEnabled( + "SPARK-51717 - validate that RocksDB file mapping is cleared " + + "when we reload version 0 after we have created a snapshot to avoid SST mismatch") { + withTempDir { dir => + val conf = dbConf.copy(minDeltasForSnapshot = 2) + val hadoopConf = new Configuration() + val remoteDir = dir.getCanonicalPath + withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db => + db.load(0) + db.put("a", "1") + db.put("b", "1") + db.commit() + + db.load(1) + db.put("a", "1") + db.commit() // we will create a snapshot for v2 + + // invalidate the db, so next load will reload from dfs + db.rollback() + + // We will replay changelog from 0 -> 2 since the v2 snapshot haven't been uploaded yet. + // We had a bug where file mapping is not being cleared when we start from v0 again, + // hence files of v2 snapshot were being reused, if v2 snapshot is uploaded + // after this load(2) but before v3 snapshot + db.load(2) + // add a larger row to make sure new sst size is different + db.put("b", "1555315569874537247638950872648") + + // now upload v2 snapshot + db.doMaintenance() + + // we will create a snapshot for v3. We shouldn't reuse files of v2 snapshot, + // given that v3 was not created from v2 snapshot since we replayed changelog from 0 -> 2 + db.commit() + + db.doMaintenance() // upload v3 snapshot + + // invalidate the db, so next load will reload from dfs + db.rollback() + + // loading v3 from dfs should be successful and no SST mismatch error + db.load(3) + } + } + } + test("ensure local files deleted on filesystem" + " are cleaned from dfs file mapping") { def getSSTFiles(dir: File): Set[File] = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org