This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fb13344556af [SPARK-49372][SS] Ensure that latestSnapshot is set to 
none on close to avoid subsequent use
fb13344556af is described below

commit fb13344556af2e6f05e01b121f36e4448135b8eb
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Mon Aug 26 10:48:09 2024 +0900

    [SPARK-49372][SS] Ensure that latestSnapshot is set to none on close to 
avoid subsequent use
    
    ### What changes were proposed in this pull request?
    Ensure that latestSnapshot is set to none on close to avoid subsequent use
    
    ### Why are the changes needed?
    Changes are needed to ensure that latestSnapshot is not used after db close 
since it might throw an exception in the context of the maint thread with this 
trace
    
    ```
    [info]   java.lang.IllegalArgumentException: requirement failed
    [info]   at scala.Predef$.require(Predef.scala:324)
    [info]   at 
org.apache.spark.util.SparkFileUtils.recursiveList(SparkFileUtils.scala:56)
    [info]   at 
org.apache.spark.util.SparkFileUtils.recursiveList$(SparkFileUtils.scala:55)
    [info]   at org.apache.spark.util.Utils$.recursiveList(Utils.scala:99)
    [info]   at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.files$lzycompute$1(RocksDBFileManager.scala:801)
    [info]   at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.files$2(RocksDBFileManager.scala:801)
    [info]   at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$logFilesInDir$3(RocksDBFileManager.scala:804)
    [info]   at 
org.apache.spark.internal.LogEntry.cachedMessageWithContext$lzycompute(Logging.scala:102)
    [info]   at 
org.apache.spark.internal.LogEntry.cachedMessageWithContext(Logging.scala:102)
    [info]   at org.apache.spark.internal.LogEntry.context(Logging.scala:106)
    [info]   at org.apache.spark.internal.Logging.logInfo(Logging.scala:189)
    [info]   at org.apache.spark.internal.Logging.logInfo$(Logging.scala:187)
    [info]   at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.logInfo(RocksDBFileManager.scala:126)
    [info]   at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.logFilesInDir(RocksDBFileManager.scala:804)
    [info]   at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:256)
    [info]   at 
org.apache.spark.sql.execution.streaming.state.RocksDB.$anonfun$uploadSnapshot$1(RocksDB.scala:730)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests
    
    Before the fix:
    ```
    [info] Run completed in 5 seconds, 666 milliseconds.
    [info] Total number of tests run: 8
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 6, failed 2, canceled 0, ignored 0, pending 0
    [info] *** 2 TESTS FAILED ***
    ```
    
    After the fix:
    ```
    [info] Run completed in 5 seconds, 661 milliseconds.
    [info] Total number of tests run: 8
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47860 from anishshri-db/task/SPARK-49372.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../sql/execution/streaming/state/RocksDB.scala    |  1 +
 .../execution/streaming/state/RocksDBSuite.scala   | 34 ++++++++++++++++++++++
 2 files changed, 35 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index d743e581df0f..64b3c3646063 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -784,6 +784,7 @@ class RocksDB(
       dbLogger.close()
       synchronized {
         latestSnapshot.foreach(_.close())
+        latestSnapshot = None
       }
       silentDeleteRecursively(localRootDir, "closing RocksDB")
     } catch {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 181ee53d6bb9..90b7c2604076 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -638,6 +638,40 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithColumnFamilies("RocksDB close tests - close before doMaintenance",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 1, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf,
+      useColumnFamilies = colFamiliesEnabled) { db =>
+      db.load(0)
+      db.put("foo", "bar")
+      db.commit()
+      // call close first and maintenance can be still be invoked in the 
context of the
+      // maintenance task's thread pool
+      db.close()
+      db.doMaintenance()
+    }
+  }
+
+  testWithColumnFamilies("RocksDB close tests - close after doMaintenance",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 1, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf,
+      useColumnFamilies = colFamiliesEnabled) { db =>
+      db.load(0)
+      db.put("foo", "bar")
+      db.commit()
+      // maintenance can be invoked in the context of the maintenance task's 
thread pool
+      // and close is invoked after that
+      db.doMaintenance()
+      db.close()
+    }
+  }
+
   testWithChangelogCheckpointingEnabled("RocksDB: Unsupported Operations" +
     " with Changelog Checkpointing") {
     val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to