This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e991caf1672f [SPARK-54072][SS] Ensure that we don't upload empty files 
in RocksDB snapshot
e991caf1672f is described below

commit e991caf1672f856708e01724a9407d15cb81746d
Author: micheal-o <[email protected]>
AuthorDate: Wed Oct 29 13:58:17 2025 -0700

    [SPARK-54072][SS] Ensure that we don't upload empty files in RocksDB 
snapshot
    
    ### What changes were proposed in this pull request?
    This change is to make sure the RocksDB snapshot zip file doesn't contain 
empty files (except RocksDB log file). Other files such as OPTION, manifest, 
metadata are not expected to be empty in any situation.
    
    Introduced a conf verifyNonEmptyFilesInZip to make sure we can turn this 
off if needed.
    
    ### Why are the changes needed?
    State integrity verification
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52774 from micheal-o/no_empty_rocksdb_zip.
    
    Authored-by: micheal-o <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  6 +++
 .../sql/execution/streaming/state/RocksDB.scala    | 11 ++++-
 .../streaming/state/RocksDBFileManager.scala       | 21 ++++++--
 .../streaming/state/StateStoreErrors.scala         | 14 ++++++
 .../execution/streaming/state/RocksDBSuite.scala   | 57 +++++++++++++++++++++-
 5 files changed, 103 insertions(+), 6 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index a3d2d53f5fad..2f8bda18ca3a 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5366,6 +5366,12 @@
     ],
     "sqlState" : "42K06"
   },
+  "STATE_STORE_UNEXPECTED_EMPTY_FILE_IN_ROCKSDB_ZIP" : {
+    "message" : [
+      "Detected an empty file <fileName> when trying to write the RocksDB 
snapshot zip file <zipFileName>. This is unexpected, please retry."
+    ],
+    "sqlState" : "XXKST"
+  },
   "STATE_STORE_UNSUPPORTED_OPERATION" : {
     "message" : [
       "<operationType> operation not supported with <entity>"
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index c4dfe39f6744..6320e709392f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1625,7 +1625,8 @@ class RocksDB(
           snapshot.fileMapping,
           Some(snapshot.columnFamilyMapping),
           Some(snapshot.maxColumnFamilyId),
-          snapshot.uniqueId
+          snapshot.uniqueId,
+          verifyNonEmptyFilesInZip = conf.verifyNonEmptyFilesInZip
         )
         fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
 
@@ -1955,6 +1956,7 @@ case class RocksDBConf(
     highPriorityPoolRatio: Double,
     memoryUpdateIntervalMs: Long,
     compressionCodec: String,
+    verifyNonEmptyFilesInZip: Boolean,
     allowFAllocate: Boolean,
     compression: String,
     reportSnapshotUploadLag: Boolean,
@@ -2059,6 +2061,12 @@ object RocksDBConf {
   val COMPRESSION_KEY = "compression"
   private val COMPRESSION_CONF = SQLConfEntry(COMPRESSION_KEY, "lz4")
 
+  // Config to determine whether we should verify that the files written
+  // to the RocksDB snapshot zip file are not empty.
+  val VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF_KEY = "verifyNonEmptyFilesInZip"
+  private val VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF =
+    SQLConfEntry(VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF_KEY, "true")
+
   def apply(storeConf: StateStoreConf): RocksDBConf = {
     val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
     val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
@@ -2147,6 +2155,7 @@ object RocksDBConf {
       getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
       getPositiveLongConf(MEMORY_UPDATE_INTERVAL_MS_CONF),
       storeConf.compressionCodec,
+      getBooleanConf(VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF),
       getBooleanConf(ALLOW_FALLOCATE_CONF),
       getStringConf(COMPRESSION_CONF),
       storeConf.reportSnapshotUploadLag,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index b99b12879135..77d8f3c5cef8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -266,7 +266,8 @@ class RocksDBFileManager(
       fileMapping: Map[String, RocksDBSnapshotFile],
       columnFamilyMapping: Option[Map[String, ColumnFamilyInfo]] = None,
       maxColumnFamilyId: Option[Short] = None,
-      checkpointUniqueId: Option[String] = None): Unit = {
+      checkpointUniqueId: Option[String] = None,
+      verifyNonEmptyFilesInZip: Boolean = false): Unit = {
     logFilesInDir(checkpointDir, log"Saving checkpoint files " +
       log"for version ${MDC(LogKeys.VERSION_NUM, version)}")
     val (localImmutableFiles, localOtherFiles) = 
listRocksDBFiles(checkpointDir)
@@ -312,7 +313,8 @@ class RocksDBFileManager(
           rootDirChecked = true
         }
       }
-      zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version, 
checkpointUniqueId))
+      zipToDfsFile(localOtherFiles :+ metadataFile,
+        dfsBatchZipFile(version, checkpointUniqueId), verifyNonEmptyFilesInZip)
       logInfo(log"Saved checkpoint file for version ${MDC(LogKeys.VERSION_NUM, 
version)} " +
         log"checkpointUniqueId: ${MDC(LogKeys.UUID, 
checkpointUniqueId.getOrElse(""))}")
     }
@@ -881,7 +883,20 @@ class RocksDBFileManager(
    * Compress files to a single zip file in DFS. Only the file names are 
embedded in the zip.
    * Any error while writing will ensure that the file is not written.
    */
-  private def zipToDfsFile(files: Seq[File], dfsZipFile: Path): Unit = {
+  private def zipToDfsFile(
+      files: Seq[File],
+      dfsZipFile: Path,
+      verifyNonEmptyFilesInZip: Boolean): Unit = {
+    if (verifyNonEmptyFilesInZip) {
+      // Verify that all files are non-empty
+      files.foreach { file =>
+        // We can have an empty log file, even when WAL is disabled
+        if (!isLogFile(file.getName) && file.length() == 0) {
+          throw StateStoreErrors.unexpectedEmptyFileInRocksDBZip(file.getName, 
dfsZipFile.toString)
+        }
+      }
+    }
+
     lazy val filesStr = s"$dfsZipFile\n\t${files.mkString("\n\t")}"
     var in: InputStream = null
     val out = fm.createAtomic(dfsZipFile, overwriteIfPossible = true)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 8a44f5c28456..970499a054b5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -248,6 +248,12 @@ object StateStoreErrors {
     new StateStoreOperationOutOfOrder(errorMsg)
   }
 
+  def unexpectedEmptyFileInRocksDBZip(
+      fileName: String,
+      zipFileName: String): StateStoreUnexpectedEmptyFileInRocksDBZip = {
+    new StateStoreUnexpectedEmptyFileInRocksDBZip(fileName, zipFileName)
+  }
+
   def cannotLoadStore(e: Throwable): Throwable = {
     e match {
       case e: SparkException
@@ -569,3 +575,11 @@ class StateStoreCommitValidationFailed(
       "missingCommits" -> missingCommits
     )
   )
+
+class StateStoreUnexpectedEmptyFileInRocksDBZip(fileName: String, zipFileName: 
String)
+  extends SparkException(
+    errorClass = "STATE_STORE_UNEXPECTED_EMPTY_FILE_IN_ROCKSDB_ZIP",
+    messageParameters = Map(
+      "fileName" -> fileName,
+      "zipFileName" -> zipFileName),
+    cause = null)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 94e3bb208bf4..499319c40f6b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -1776,6 +1776,57 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     changelogReader.closeIfNeeded()
   }
 
+  testWithChangelogCheckpointingDisabled("Verify non empty files during zip 
file upload") {
+    withTempDir { dir =>
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir) { db =>
+        // create an empty snapshot should succeed
+        db.load(0)
+        db.commit() // empty snapshot
+        assert(new File(remoteDir, "1.zip").exists())
+
+        // snapshot with only 1 key should succeed
+        db.load(1)
+        db.put("a", "1")
+        db.commit()
+        assert(new File(remoteDir, "2.zip").exists())
+      }
+    }
+
+    // Now create snapshot with an empty file
+    withTempDir { dir =>
+      val remoteDir = dir.getCanonicalPath
+      val fileManager = new RocksDBFileManager(
+        remoteDir, Utils.createTempDir(), hadoopConf)
+        // file name -> size
+      val ckptFiles = Seq(
+        "archive/00002.log" -> 0, // not included in the zip
+        "archive/00003.log" -> 10,
+        "001.sst" -> 10,
+        "002.sst" -> 20,
+        "empty-file" -> 0
+      )
+      // Should throw exception
+      val ex = intercept[SparkException] {
+        saveCheckpointFiles(fileManager, ckptFiles, version = 1,
+          numKeys = 10, new RocksDBFileMapping(),
+          verifyNonEmptyFilesInZip = true)
+      }
+
+      checkError(
+        exception = ex,
+        condition = "STATE_STORE_UNEXPECTED_EMPTY_FILE_IN_ROCKSDB_ZIP",
+        parameters = Map(
+          "fileName" -> "empty-file",
+          "zipFileName" -> s"$remoteDir/1.zip"))
+
+      // Shouldn't throw exception if disabled
+      saveCheckpointFiles(fileManager, ckptFiles, version = 1,
+        numKeys = 10, new RocksDBFileMapping(),
+        verifyNonEmptyFilesInZip = false)
+    }
+  }
+
   testWithChangelogCheckpointingEnabled(
     "RocksDBFileManager: read and write v2 changelog with default col family") 
{
     val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
@@ -3786,7 +3837,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       numKeys: Int,
       fileMapping: RocksDBFileMapping,
       numInternalKeys: Int = 0,
-      checkpointUniqueId: Option[String] = None): Unit = {
+      checkpointUniqueId: Option[String] = None,
+      verifyNonEmptyFilesInZip: Boolean = true): Unit = {
     val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to 
create checkpoints
     generateFiles(checkpointDir, fileToLengths)
     val (dfsFileSuffix, immutableFileMapping) = 
fileMapping.createSnapshotFileMapping(
@@ -3797,7 +3849,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       numKeys,
       numInternalKeys,
       immutableFileMapping,
-      checkpointUniqueId = checkpointUniqueId)
+      checkpointUniqueId = checkpointUniqueId,
+      verifyNonEmptyFilesInZip = verifyNonEmptyFilesInZip)
 
     val snapshotInfo = RocksDBVersionSnapshotInfo(version, dfsFileSuffix)
     fileMapping.snapshotsPendingUpload.remove(snapshotInfo)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to