This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5550bf0fe9a0 [SPARK-51675][SS] Fix col family creation after opening 
local DB to avoid snapshot creation, if not necessary
5550bf0fe9a0 is described below

commit 5550bf0fe9a0d8a89d07ba0efe506adf60898c63
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Tue Apr 1 15:42:49 2025 +0900

    [SPARK-51675][SS] Fix col family creation after opening local DB to avoid 
snapshot creation, if not necessary
    
    ### What changes were proposed in this pull request?
    Fix col family creation after opening local DB to avoid snapshot creation, 
if not necessary
    
    ### Why are the changes needed?
    Without this, we might force snapshot creation where its not required
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Fixed unit tests
    
    ```
    [info] Run completed in 2 minutes, 1 second.
    [info] Total number of tests run: 4
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50471 from anishshri-db/task/SPARK-51675.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit c0fbc6ba7d2d8866d4266bcdf75729c5c2decd40)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    |  4 +++-
 .../execution/streaming/state/RocksDBSuite.scala   | 22 +++-------------------
 2 files changed, 6 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 72ff4c480667..5fc73e1cbd40 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -528,11 +528,13 @@ class RocksDB(
       maxColumnFamilyId.set(maxId)
     }
 
+    openDB()
+    // Call this after opening the DB to ensure that forcing snapshot is not 
triggered
+    // unnecessarily.
     if (useColumnFamilies) {
       createColFamilyIfAbsent(StateStore.DEFAULT_COL_FAMILY_NAME, isInternal = 
false)
     }
 
-    openDB()
     val (numKeys, numInternalKeys) = {
       if (!conf.trackTotalNumberOfRows) {
         // we don't track the total number of rows - discard the number being 
track
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 9a79f3fa0ae8..b5a2ec43001a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -949,13 +949,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         db.commit()
       }
 
-      if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
-        // This is because 30 is executed twice and snapshot does not 
overwrite in checkpoint v2
-        assert(snapshotVersionsPresent(remoteDir) === (1 to 30) :+ 30 :+ 31)
-      } else {
-        assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
-      }
-
+      assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
       assert(changelogVersionsPresent(remoteDir) === (30 to 60))
       for (version <- 1 to 60) {
         db.load(version, versionToUniqueId.get(version), readOnly = true)
@@ -972,20 +966,10 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       // Check that snapshots and changelogs get purged correctly.
       db.doMaintenance()
 
-      // Behavior is slightly different when column families are enabled with 
checkpoint v2
-      // since snapshot version 31 was created previously.
-      if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
-        assert(snapshotVersionsPresent(remoteDir) === Seq(31, 60, 60))
-      } else {
-        assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
-      }
+      assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
       if (enableStateStoreCheckpointIds) {
         // recommit version 60 creates another changelog file with different 
unique id
-        if (colFamiliesEnabled) {
-          assert(changelogVersionsPresent(remoteDir) === (31 to 60) :+ 60)
-        } else {
-          assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60)
-        }
+        assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60)
       } else {
         assert(changelogVersionsPresent(remoteDir) === (30 to 60))
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to