This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 5550bf0fe9a0 [SPARK-51675][SS] Fix col family creation after opening local DB to avoid snapshot creation, if not necessary 5550bf0fe9a0 is described below commit 5550bf0fe9a0d8a89d07ba0efe506adf60898c63 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Tue Apr 1 15:42:49 2025 +0900 [SPARK-51675][SS] Fix col family creation after opening local DB to avoid snapshot creation, if not necessary ### What changes were proposed in this pull request? Fix col family creation after opening local DB to avoid snapshot creation, if not necessary ### Why are the changes needed? Without this, we might force snapshot creation where its not required ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Fixed unit tests ``` [info] Run completed in 2 minutes, 1 second. [info] Total number of tests run: 4 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #50471 from anishshri-db/task/SPARK-51675. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit c0fbc6ba7d2d8866d4266bcdf75729c5c2decd40) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/state/RocksDB.scala | 4 +++- .../execution/streaming/state/RocksDBSuite.scala | 22 +++------------------- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 72ff4c480667..5fc73e1cbd40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -528,11 +528,13 @@ class RocksDB( maxColumnFamilyId.set(maxId) } + openDB() + // Call this after opening the DB to ensure that forcing snapshot is not triggered + // unnecessarily. if (useColumnFamilies) { createColFamilyIfAbsent(StateStore.DEFAULT_COL_FAMILY_NAME, isInternal = false) } - openDB() val (numKeys, numInternalKeys) = { if (!conf.trackTotalNumberOfRows) { // we don't track the total number of rows - discard the number being track diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 9a79f3fa0ae8..b5a2ec43001a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -949,13 +949,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession db.commit() } - if (enableStateStoreCheckpointIds && colFamiliesEnabled) { - // This is because 30 is executed twice and snapshot does not overwrite in checkpoint v2 - assert(snapshotVersionsPresent(remoteDir) === (1 to 30) :+ 30 :+ 31) - } else { - assert(snapshotVersionsPresent(remoteDir) === (1 to 30)) - } - + assert(snapshotVersionsPresent(remoteDir) === (1 to 30)) assert(changelogVersionsPresent(remoteDir) === (30 to 60)) for (version <- 1 to 60) { db.load(version, versionToUniqueId.get(version), readOnly = true) @@ -972,20 +966,10 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession // Check that snapshots and changelogs get purged correctly. db.doMaintenance() - // Behavior is slightly different when column families are enabled with checkpoint v2 - // since snapshot version 31 was created previously. - if (enableStateStoreCheckpointIds && colFamiliesEnabled) { - assert(snapshotVersionsPresent(remoteDir) === Seq(31, 60, 60)) - } else { - assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60)) - } + assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60)) if (enableStateStoreCheckpointIds) { // recommit version 60 creates another changelog file with different unique id - if (colFamiliesEnabled) { - assert(changelogVersionsPresent(remoteDir) === (31 to 60) :+ 60) - } else { - assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60) - } + assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60) } else { assert(changelogVersionsPresent(remoteDir) === (30 to 60)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org