This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 08bd390c1c63 [SPARK-53794][SS] Add option to limit deletions per 
maintenance operation associated with rocksdb state provider
08bd390c1c63 is described below

commit 08bd390c1c630345b5bb8889f39450cd57d6d09a
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Mon Oct 6 16:03:12 2025 -0700

    [SPARK-53794][SS] Add option to limit deletions per maintenance operation 
associated with rocksdb state provider
    
    ### What changes were proposed in this pull request?
    Add option to limit deletions per maintenance operation associated with 
rocksdb state provider
    
    ### Why are the changes needed?
    We see some instances where the changelog deletion can take a really long 
time. This means that for that partition, we also cannot upload full snapshots 
which affects recovery/replay scenarios. This problem is much more apparent on 
resource constrained clusters. So, we add an option to allow for incremental 
cleanup per maintenance operation invocation.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests
    
    ```
    [info] Run completed in 17 seconds, 591 milliseconds.
    [info] Total number of tests run: 8
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52511 from anishshri-db/task/SPARK-53794.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  12 +++
 .../sql/execution/streaming/state/RocksDB.scala    |   7 +-
 .../streaming/state/RocksDBFileManager.scala       |  24 ++++-
 .../execution/streaming/state/StateStoreConf.scala |   3 +
 .../execution/streaming/state/RocksDBSuite.scala   | 120 +++++++++++++++++++++
 5 files changed, 163 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index eea92dffb048..c17e6910a561 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2664,6 +2664,16 @@ object SQLConf {
     .doubleConf
     .createWithDefault(0.3)
 
+  val MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE =
+    
buildConf("spark.sql.streaming.stateStore.maxVersionsToDeletePerMaintenance")
+    .internal()
+    .doc("The maximum number of versions to delete per maintenance operation. 
By default, " +
+      "this value is set to -1, which means no limit. Note that, currently 
this is only " +
+      "supported for the RocksDB state store provider.")
+    .version("4.1.0")
+    .intConf
+    .createWithDefault(-1)
+
   val MAX_BATCHES_TO_RETAIN_IN_MEMORY = 
buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
     .internal()
     .doc("The maximum number of batches which will be retained in memory to 
avoid " +
@@ -6693,6 +6703,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
 
+  def maxVersionsToDeletePerMaintenance: Int = 
getConf(MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE)
+
   def ratioExtraSpaceAllowedInCheckpoint: Double = 
getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT)
 
   def maxBatchesToRetainInMemory: Int = 
getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 651617119f45..774ed23ed55b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1396,6 +1396,7 @@ class RocksDB(
     val cleanupTime = timeTakenMs {
       fileManager.deleteOldVersions(
         numVersionsToRetain = conf.minVersionsToRetain,
+        maxVersionsToDeletePerMaintenance = 
conf.maxVersionsToDeletePerMaintenance,
         minVersionsToDelete = conf.minVersionsToDelete)
     }
     logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, 
cleanupTime)} ms")
@@ -1953,7 +1954,8 @@ case class RocksDBConf(
     compressionCodec: String,
     allowFAllocate: Boolean,
     compression: String,
-    reportSnapshotUploadLag: Boolean)
+    reportSnapshotUploadLag: Boolean,
+    maxVersionsToDeletePerMaintenance: Int)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -2144,7 +2146,8 @@ object RocksDBConf {
       storeConf.compressionCodec,
       getBooleanConf(ALLOW_FALLOCATE_CONF),
       getStringConf(COMPRESSION_CONF),
-      storeConf.reportSnapshotUploadLag)
+      storeConf.reportSnapshotUploadLag,
+      storeConf.maxVersionsToDeletePerMaintenance)
   }
 
   def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 2015a5e27a2c..b99b12879135 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -580,8 +580,22 @@ class RocksDBFileManager(
    * - Partially written SST files
    * - SST files that were used in a version, but that version got overwritten 
with a different
    *   set of SST files.
+   *
+   * @param numVersionsToRetain the number of RocksDB versions to keep in 
object store after the
+   *                            deletion. Must be greater than 0, or -1 to 
retain all versions.
+   * @param maxVersionsToDeletePerMaintenance the max number of RocksDB 
versions
+   *                            to delete per maintenance operation.
+   *                            Must be greater than 0, or -1 to delete all 
stale versions.
+   * @param minVersionsToDelete the min number of stale versions required to 
trigger deletion.
+   *                            If its set to <= 0, then we will always 
perform list operations
+   *                            to determine deletion candidates. If set to a 
positive value, then
+   *                            we will skip deletion if the number of stale 
versions is less than
+   *                            this value.
    */
-  def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 
0): Unit = {
+  def deleteOldVersions(
+      numVersionsToRetain: Int,
+      maxVersionsToDeletePerMaintenance: Int = -1,
+      minVersionsToDelete: Long = 0): Unit = {
     // Check if enough stale version files present
     if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return
 
@@ -603,14 +617,22 @@ class RocksDBFileManager(
 
     // Find the versions to delete
     val maxSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.last._1
+    val minSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.head._1
 
     // In order to reconstruct numVersionsToRetain version, retain the latest 
snapshot
     // that satisfies (version <= maxSnapshotVersionPresent - 
numVersionsToRetain + 1).
+    // Also require
+    // minVersionToRetain <= minSnapshotVersionPresent + 
maxVersionsToDeletePerMaintenance.
     // If none of the snapshots satisfy the condition, minVersionToRetain will 
be 0 and
     // no version gets deleted.
     val minVersionToRetain = sortedSnapshotVersionsAndUniqueIds
       .map(_._1)
       .filter(_ <= maxSnapshotVersionPresent - numVersionsToRetain + 1)
+      .filter( v =>
+        if (maxVersionsToDeletePerMaintenance != -1) {
+          v <= minSnapshotVersionPresent + maxVersionsToDeletePerMaintenance
+        } else true
+      )
       .foldLeft(0L)(math.max)
 
     // When snapshotVersionToDelete is non-empty, there are at least 2 
snapshot versions.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 00bb7de46dc4..3cf302cd2be8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -62,6 +62,9 @@ class StateStoreConf(
   /** Maximum count of versions a State Store implementation should retain in 
memory */
   val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory
 
+  /** Maximum number of versions to delete per maintenance operation */
+  val maxVersionsToDeletePerMaintenance: Int = 
sqlConf.maxVersionsToDeletePerMaintenance
+
   /**
    * Optional fully qualified name of the subclass of [[StateStoreProvider]]
    * managing state data. That is, the implementation of the State Store to 
use.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 6f4125bb8b5c..801e74d288b4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -783,6 +783,126 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       }
   }
 
+  testWithStateStoreCheckpointIdsAndColumnFamilies(
+    "RocksDB: purge version files with minVersionsToDelete > 0 " +
+    "and maxVersionsToDeletePerMaintenance > 0",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) {
+    case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>
+      val remoteDir = Utils.createTempDir().toString
+      new File(remoteDir).delete() // to make sure that the directory gets 
created
+      val conf = dbConf.copy(
+        minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete 
= 3,
+        maxVersionsToDeletePerMaintenance = 1)
+      withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
+        enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
+        // Commit 5 versions
+        // stale versions: (1, 2)
+        // keep versions: (3, 4, 5)
+        for (version <- 0 to 4) {
+          // Should upload latest snapshot but not delete any files
+          // since number of stale versions < minVersionsToDelete
+          db.load(version)
+          db.commit()
+          db.doMaintenance()
+        }
+
+        // Commit 1 more version
+        // stale versions: (1, 2, 3)
+        // keep versions: (4, 5, 6)
+        db.load(5)
+        db.commit()
+
+        // Checkpoint directory before maintenance
+        if (isChangelogCheckpointingEnabled) {
+          assert(snapshotVersionsPresent(remoteDir) == (1 to 5))
+          assert(changelogVersionsPresent(remoteDir) == (1 to 6))
+        } else {
+          assert(snapshotVersionsPresent(remoteDir) == (1 to 6))
+        }
+
+        // Should delete stale versions for zip files and change log files
+        // since number of stale versions >= minVersionsToDelete
+        db.doMaintenance()
+
+        // Checkpoint directory after maintenance
+        // Verify that only one version is deleted because 
maxVersionsToDeletePerMaintenance = 1
+        assert(snapshotVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6))
+        if (isChangelogCheckpointingEnabled) {
+          assert(changelogVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6))
+        }
+
+        // Commit 1 more version to ensure that minVersionsToDelete constraint 
is satisfied
+        db.load(6)
+        db.commit()
+        db.doMaintenance()
+        // Verify that only one version is deleted because 
maxVersionsToDeletePerMaintenance = 1
+        assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7))
+        if (isChangelogCheckpointingEnabled) {
+          assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7))
+        }
+      }
+  }
+
+  testWithStateStoreCheckpointIdsAndColumnFamilies(
+    "RocksDB: purge version files with minVersionsToDelete < 
maxVersionsToDeletePerMaintenance",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) {
+    case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>
+      val remoteDir = Utils.createTempDir().toString
+      new File(remoteDir).delete() // to make sure that the directory gets 
created
+      val conf = dbConf.copy(
+        minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete 
= 1,
+        maxVersionsToDeletePerMaintenance = 2)
+      withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
+        enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
+        // Commit 5 versions
+        // stale versions: (1, 2)
+        // keep versions: (3, 4, 5)
+        for (version <- 0 to 4) {
+          // Should upload latest snapshot but not delete any files
+          // since number of stale versions < minVersionsToDelete
+          db.load(version)
+          db.commit()
+          db.doMaintenance()
+        }
+
+        // Commit 1 more version
+        // stale versions: (1, 2, 3)
+        // keep versions: (4, 5, 6)
+        db.load(5)
+        db.commit()
+
+        // Checkpoint directory before maintenance
+        // Verify that 2 oldest stale versions are deleted
+        if (isChangelogCheckpointingEnabled) {
+          assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5))
+          assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6))
+        } else {
+          assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6))
+        }
+
+        // Should delete stale versions for zip files and change log files
+        // since number of stale versions >= minVersionsToDelete
+        db.doMaintenance()
+
+        // Checkpoint directory after maintenance
+        // Verify that only one version is deleted since thats the only stale 
version left
+        assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6))
+        if (isChangelogCheckpointingEnabled) {
+          assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6))
+        }
+
+        // Commit 1 more version to ensure that minVersionsToDelete constraint 
is satisfied
+        db.load(6)
+        db.commit()
+        db.doMaintenance()
+        // Verify that only one version is deleted since thats the only stale 
version left
+        assert(snapshotVersionsPresent(remoteDir) == Seq(5, 6, 7))
+        if (isChangelogCheckpointingEnabled) {
+          assert(changelogVersionsPresent(remoteDir) == Seq(5, 6, 7))
+        }
+      }
+  }
+
   testWithStateStoreCheckpointIdsAndColumnFamilies(
     "RocksDB: minDeltasForSnapshot",
     TestWithChangelogCheckpointingEnabled) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to