This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fb13344556af [SPARK-49372][SS] Ensure that latestSnapshot is set to
none on close to avoid subsequent use
fb13344556af is described below
commit fb13344556af2e6f05e01b121f36e4448135b8eb
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Mon Aug 26 10:48:09 2024 +0900
[SPARK-49372][SS] Ensure that latestSnapshot is set to none on close to
avoid subsequent use
### What changes were proposed in this pull request?
Ensure that latestSnapshot is set to none on close to avoid subsequent use
### Why are the changes needed?
Changes are needed to ensure that latestSnapshot is not used after db close
since it might throw an exception in the context of the maint thread with this
trace
```
[info] java.lang.IllegalArgumentException: requirement failed
[info] at scala.Predef$.require(Predef.scala:324)
[info] at
org.apache.spark.util.SparkFileUtils.recursiveList(SparkFileUtils.scala:56)
[info] at
org.apache.spark.util.SparkFileUtils.recursiveList$(SparkFileUtils.scala:55)
[info] at org.apache.spark.util.Utils$.recursiveList(Utils.scala:99)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.files$lzycompute$1(RocksDBFileManager.scala:801)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.files$2(RocksDBFileManager.scala:801)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$logFilesInDir$3(RocksDBFileManager.scala:804)
[info] at
org.apache.spark.internal.LogEntry.cachedMessageWithContext$lzycompute(Logging.scala:102)
[info] at
org.apache.spark.internal.LogEntry.cachedMessageWithContext(Logging.scala:102)
[info] at org.apache.spark.internal.LogEntry.context(Logging.scala:106)
[info] at org.apache.spark.internal.Logging.logInfo(Logging.scala:189)
[info] at org.apache.spark.internal.Logging.logInfo$(Logging.scala:187)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.logInfo(RocksDBFileManager.scala:126)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.logFilesInDir(RocksDBFileManager.scala:804)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:256)
[info] at
org.apache.spark.sql.execution.streaming.state.RocksDB.$anonfun$uploadSnapshot$1(RocksDB.scala:730)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests
Before the fix:
```
[info] Run completed in 5 seconds, 666 milliseconds.
[info] Total number of tests run: 8
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 6, failed 2, canceled 0, ignored 0, pending 0
[info] *** 2 TESTS FAILED ***
```
After the fix:
```
[info] Run completed in 5 seconds, 661 milliseconds.
[info] Total number of tests run: 8
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47860 from anishshri-db/task/SPARK-49372.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 1 +
.../execution/streaming/state/RocksDBSuite.scala | 34 ++++++++++++++++++++++
2 files changed, 35 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index d743e581df0f..64b3c3646063 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -784,6 +784,7 @@ class RocksDB(
dbLogger.close()
synchronized {
latestSnapshot.foreach(_.close())
+ latestSnapshot = None
}
silentDeleteRecursively(localRootDir, "closing RocksDB")
} catch {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 181ee53d6bb9..90b7c2604076 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -638,6 +638,40 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithColumnFamilies("RocksDB close tests - close before doMaintenance",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 1, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf,
+ useColumnFamilies = colFamiliesEnabled) { db =>
+ db.load(0)
+ db.put("foo", "bar")
+ db.commit()
+ // call close first and maintenance can be still be invoked in the
context of the
+ // maintenance task's thread pool
+ db.close()
+ db.doMaintenance()
+ }
+ }
+
+ testWithColumnFamilies("RocksDB close tests - close after doMaintenance",
+ TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled
=>
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(minDeltasForSnapshot = 1, compactOnCommit = false)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+ withDB(remoteDir, conf = conf,
+ useColumnFamilies = colFamiliesEnabled) { db =>
+ db.load(0)
+ db.put("foo", "bar")
+ db.commit()
+ // maintenance can be invoked in the context of the maintenance task's
thread pool
+ // and close is invoked after that
+ db.doMaintenance()
+ db.close()
+ }
+ }
+
testWithChangelogCheckpointingEnabled("RocksDB: Unsupported Operations" +
" with Changelog Checkpointing") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath +
"/state/1/1")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]