This is an automated email from the ASF dual-hosted git repository.
ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 08bd390c1c63 [SPARK-53794][SS] Add option to limit deletions per
maintenance operation associated with rocksdb state provider
08bd390c1c63 is described below
commit 08bd390c1c630345b5bb8889f39450cd57d6d09a
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Mon Oct 6 16:03:12 2025 -0700
[SPARK-53794][SS] Add option to limit deletions per maintenance operation
associated with rocksdb state provider
### What changes were proposed in this pull request?
Add option to limit deletions per maintenance operation associated with
rocksdb state provider
### Why are the changes needed?
We see some instances where the changelog deletion can take a really long
time. This means that for that partition, we also cannot upload full snapshots
which affects recovery/replay scenarios. This problem is much more apparent on
resource constrained clusters. So, we add an option to allow for incremental
cleanup per maintenance operation invocation.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests
```
[info] Run completed in 17 seconds, 591 milliseconds.
[info] Total number of tests run: 8
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52511 from anishshri-db/task/SPARK-53794.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Anish Shrigondekar <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 12 +++
.../sql/execution/streaming/state/RocksDB.scala | 7 +-
.../streaming/state/RocksDBFileManager.scala | 24 ++++-
.../execution/streaming/state/StateStoreConf.scala | 3 +
.../execution/streaming/state/RocksDBSuite.scala | 120 +++++++++++++++++++++
5 files changed, 163 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index eea92dffb048..c17e6910a561 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2664,6 +2664,16 @@ object SQLConf {
.doubleConf
.createWithDefault(0.3)
+ val MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE =
+
buildConf("spark.sql.streaming.stateStore.maxVersionsToDeletePerMaintenance")
+ .internal()
+ .doc("The maximum number of versions to delete per maintenance operation.
By default, " +
+ "this value is set to -1, which means no limit. Note that, currently
this is only " +
+ "supported for the RocksDB state store provider.")
+ .version("4.1.0")
+ .intConf
+ .createWithDefault(-1)
+
val MAX_BATCHES_TO_RETAIN_IN_MEMORY =
buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
.internal()
.doc("The maximum number of batches which will be retained in memory to
avoid " +
@@ -6693,6 +6703,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
+ def maxVersionsToDeletePerMaintenance: Int =
getConf(MAX_VERSIONS_TO_DELETE_PER_MAINTENANCE)
+
def ratioExtraSpaceAllowedInCheckpoint: Double =
getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT)
def maxBatchesToRetainInMemory: Int =
getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 651617119f45..774ed23ed55b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1396,6 +1396,7 @@ class RocksDB(
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(
numVersionsToRetain = conf.minVersionsToRetain,
+ maxVersionsToDeletePerMaintenance =
conf.maxVersionsToDeletePerMaintenance,
minVersionsToDelete = conf.minVersionsToDelete)
}
logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS,
cleanupTime)} ms")
@@ -1953,7 +1954,8 @@ case class RocksDBConf(
compressionCodec: String,
allowFAllocate: Boolean,
compression: String,
- reportSnapshotUploadLag: Boolean)
+ reportSnapshotUploadLag: Boolean,
+ maxVersionsToDeletePerMaintenance: Int)
object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -2144,7 +2146,8 @@ object RocksDBConf {
storeConf.compressionCodec,
getBooleanConf(ALLOW_FALLOCATE_CONF),
getStringConf(COMPRESSION_CONF),
- storeConf.reportSnapshotUploadLag)
+ storeConf.reportSnapshotUploadLag,
+ storeConf.maxVersionsToDeletePerMaintenance)
}
def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 2015a5e27a2c..b99b12879135 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -580,8 +580,22 @@ class RocksDBFileManager(
* - Partially written SST files
* - SST files that were used in a version, but that version got overwritten
with a different
* set of SST files.
+ *
+ * @param numVersionsToRetain the number of RocksDB versions to keep in
object store after the
+ * deletion. Must be greater than 0, or -1 to
retain all versions.
+ * @param maxVersionsToDeletePerMaintenance the max number of RocksDB
versions
+ * to delete per maintenance operation.
+ * Must be greater than 0, or -1 to delete all
stale versions.
+ * @param minVersionsToDelete the min number of stale versions required to
trigger deletion.
+ * If its set to <= 0, then we will always
perform list operations
+ * to determine deletion candidates. If set to a
positive value, then
+ * we will skip deletion if the number of stale
versions is less than
+ * this value.
*/
- def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long =
0): Unit = {
+ def deleteOldVersions(
+ numVersionsToRetain: Int,
+ maxVersionsToDeletePerMaintenance: Int = -1,
+ minVersionsToDelete: Long = 0): Unit = {
// Check if enough stale version files present
if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return
@@ -603,14 +617,22 @@ class RocksDBFileManager(
// Find the versions to delete
val maxSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.last._1
+ val minSnapshotVersionPresent = sortedSnapshotVersionsAndUniqueIds.head._1
// In order to reconstruct numVersionsToRetain version, retain the latest
snapshot
// that satisfies (version <= maxSnapshotVersionPresent -
numVersionsToRetain + 1).
+ // Also require
+ // minVersionToRetain <= minSnapshotVersionPresent +
maxVersionsToDeletePerMaintenance.
// If none of the snapshots satisfy the condition, minVersionToRetain will
be 0 and
// no version gets deleted.
val minVersionToRetain = sortedSnapshotVersionsAndUniqueIds
.map(_._1)
.filter(_ <= maxSnapshotVersionPresent - numVersionsToRetain + 1)
+ .filter( v =>
+ if (maxVersionsToDeletePerMaintenance != -1) {
+ v <= minSnapshotVersionPresent + maxVersionsToDeletePerMaintenance
+ } else true
+ )
.foldLeft(0L)(math.max)
// When snapshotVersionToDelete is non-empty, there are at least 2
snapshot versions.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 00bb7de46dc4..3cf302cd2be8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -62,6 +62,9 @@ class StateStoreConf(
/** Maximum count of versions a State Store implementation should retain in
memory */
val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory
+ /** Maximum number of versions to delete per maintenance operation */
+ val maxVersionsToDeletePerMaintenance: Int =
sqlConf.maxVersionsToDeletePerMaintenance
+
/**
* Optional fully qualified name of the subclass of [[StateStoreProvider]]
* managing state data. That is, the implementation of the State Store to
use.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 6f4125bb8b5c..801e74d288b4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -783,6 +783,126 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
}
}
+ testWithStateStoreCheckpointIdsAndColumnFamilies(
+ "RocksDB: purge version files with minVersionsToDelete > 0 " +
+ "and maxVersionsToDeletePerMaintenance > 0",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) {
+ case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val conf = dbConf.copy(
+ minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete
= 3,
+ maxVersionsToDeletePerMaintenance = 1)
+ withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
+ enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
+ // Commit 5 versions
+ // stale versions: (1, 2)
+ // keep versions: (3, 4, 5)
+ for (version <- 0 to 4) {
+ // Should upload latest snapshot but not delete any files
+ // since number of stale versions < minVersionsToDelete
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+
+ // Commit 1 more version
+ // stale versions: (1, 2, 3)
+ // keep versions: (4, 5, 6)
+ db.load(5)
+ db.commit()
+
+ // Checkpoint directory before maintenance
+ if (isChangelogCheckpointingEnabled) {
+ assert(snapshotVersionsPresent(remoteDir) == (1 to 5))
+ assert(changelogVersionsPresent(remoteDir) == (1 to 6))
+ } else {
+ assert(snapshotVersionsPresent(remoteDir) == (1 to 6))
+ }
+
+ // Should delete stale versions for zip files and change log files
+ // since number of stale versions >= minVersionsToDelete
+ db.doMaintenance()
+
+ // Checkpoint directory after maintenance
+ // Verify that only one version is deleted because
maxVersionsToDeletePerMaintenance = 1
+ assert(snapshotVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6))
+ if (isChangelogCheckpointingEnabled) {
+ assert(changelogVersionsPresent(remoteDir) == Seq(2, 3, 4, 5, 6))
+ }
+
+ // Commit 1 more version to ensure that minVersionsToDelete constraint
is satisfied
+ db.load(6)
+ db.commit()
+ db.doMaintenance()
+ // Verify that only one version is deleted because
maxVersionsToDeletePerMaintenance = 1
+ assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7))
+ if (isChangelogCheckpointingEnabled) {
+ assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6, 7))
+ }
+ }
+ }
+
+ testWithStateStoreCheckpointIdsAndColumnFamilies(
+ "RocksDB: purge version files with minVersionsToDelete <
maxVersionsToDeletePerMaintenance",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) {
+ case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ val conf = dbConf.copy(
+ minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete
= 1,
+ maxVersionsToDeletePerMaintenance = 2)
+ withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled,
+ enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
+ // Commit 5 versions
+ // stale versions: (1, 2)
+ // keep versions: (3, 4, 5)
+ for (version <- 0 to 4) {
+ // Should upload latest snapshot but not delete any files
+ // since number of stale versions < minVersionsToDelete
+ db.load(version)
+ db.commit()
+ db.doMaintenance()
+ }
+
+ // Commit 1 more version
+ // stale versions: (1, 2, 3)
+ // keep versions: (4, 5, 6)
+ db.load(5)
+ db.commit()
+
+ // Checkpoint directory before maintenance
+ // Verify that 2 oldest stale versions are deleted
+ if (isChangelogCheckpointingEnabled) {
+ assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5))
+ assert(changelogVersionsPresent(remoteDir) == Seq(3, 4, 5, 6))
+ } else {
+ assert(snapshotVersionsPresent(remoteDir) == Seq(3, 4, 5, 6))
+ }
+
+ // Should delete stale versions for zip files and change log files
+ // since number of stale versions >= minVersionsToDelete
+ db.doMaintenance()
+
+ // Checkpoint directory after maintenance
+ // Verify that only one version is deleted since thats the only stale
version left
+ assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6))
+ if (isChangelogCheckpointingEnabled) {
+ assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6))
+ }
+
+ // Commit 1 more version to ensure that minVersionsToDelete constraint
is satisfied
+ db.load(6)
+ db.commit()
+ db.doMaintenance()
+ // Verify that only one version is deleted since thats the only stale
version left
+ assert(snapshotVersionsPresent(remoteDir) == Seq(5, 6, 7))
+ if (isChangelogCheckpointingEnabled) {
+ assert(changelogVersionsPresent(remoteDir) == Seq(5, 6, 7))
+ }
+ }
+ }
+
testWithStateStoreCheckpointIdsAndColumnFamilies(
"RocksDB: minDeltasForSnapshot",
TestWithChangelogCheckpointingEnabled) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]