This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aead8224ae01 [SPARK-54078][SS] Deflake StateStoreSuite `SPARK-40492: 
maintenance before unload`
aead8224ae01 is described below

commit aead8224ae013f154e5e34049146e9e7675368b5
Author: Livia Zhu <[email protected]>
AuthorDate: Thu Oct 30 10:13:13 2025 -0700

    [SPARK-54078][SS] Deflake StateStoreSuite `SPARK-40492: maintenance before 
unload`
    
    ### What changes were proposed in this pull request?
    
    Fix flakiness in this suite caused by occasional flakiness in maintenance 
thread and long maintenance duration. Instead test the functionality required 
(maintenance is called before unloading for deactivated instances) by adding a 
function that allows us to pause/unpause maintenance.
    
    ### Why are the changes needed?
    
    Fix test flakiness
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52783 from liviazhu/liviazhu-db/statestore-flakiness.
    
    Authored-by: Livia Zhu <[email protected]>
    Signed-off-by: Anish Shrigondekar <[email protected]>
---
 .../sql/execution/streaming/state/StateStore.scala | 10 ++++++++++
 .../streaming/state/StateStoreSuite.scala          | 22 +++++++++++++++-------
 2 files changed, 25 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 701b51a896a6..539545041e4d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -1343,6 +1343,12 @@ object StateStore extends Logging {
     }
   }
 
+  // Pause maintenance for testing purposes only.
+  @volatile private var maintenancePaused: Boolean = false
+  private[spark] def setMaintenancePaused(maintPaused: Boolean): Unit = {
+      maintenancePaused = maintPaused
+  }
+
   /**
    * Execute background maintenance task in all the loaded store providers if 
they are still
    * the active instances according to the coordinator.
@@ -1352,6 +1358,10 @@ object StateStore extends Logging {
     if (SparkEnv.get == null) {
       throw new IllegalStateException("SparkEnv not active, cannot do 
maintenance on StateStores")
     }
+    if (maintenancePaused) {
+      logDebug("Maintenance paused")
+      return
+    }
 
     // Providers that couldn't be processed now and need to be added back to 
the queue
     val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, 
StateStoreProvider)]()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 1acf239df85b..f615dab6adee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -1007,6 +1007,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  // Ensure that maintenance is called before unloading
   test("SPARK-40492: maintenance before unload") {
     val conf = new SparkConf()
       .setMaster("local")
@@ -1016,9 +1017,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     val storeProviderId1 = StateStoreProviderId(StateStoreId(dir1, opId, 0), 
UUID.randomUUID)
     val sqlConf = 
getDefaultSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
       SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get)
-    sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
-    // Make maintenance interval large so that maintenance is called after 
deactivating instances.
-    sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 1.minute.toMillis)
+    sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 10)
+    sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 10L)
     val storeConf = StateStoreConf(sqlConf)
     val hadoopConf = new Configuration()
 
@@ -1058,17 +1058,23 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
               assert(snapshotVersions.nonEmpty, "no snapshot file found")
             }
           }
+          // Pause maintenance
+          StateStore.setMaintenancePaused(true)
+
           // Generate more versions such that there is another snapshot.
           generateStoreVersions()
 
           // If driver decides to deactivate all stores related to a query run,
           // then this instance should be unloaded.
           coordinatorRef.deactivateInstances(storeProviderId1.queryRunId)
+
+          // Resume maintenance which should unload the deactivated store
+          StateStore.setMaintenancePaused(false)
           eventually(timeout(timeoutDuration)) {
             assert(!StateStore.isLoaded(storeProviderId1))
           }
 
-          // Earliest delta file should be scheduled a cleanup during unload.
+          // Ensure the earliest delta file should be cleaned up during unload.
           tryWithProviderResource(newStoreProvider(storeProviderId1.storeId)) 
{ provider =>
             eventually(timeout(timeoutDuration)) {
               assert(!fileExists(provider, 1, isSnapshot = false), "earliest 
file not deleted")
@@ -2412,9 +2418,11 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
       isSnapshot: Boolean): Boolean = {
     val method = PrivateMethod[Path](Symbol("baseDir"))
     val basePath = provider invokePrivate method()
-    val fileName = if (isSnapshot) s"$version.snapshot" else s"$version.delta"
-    val filePath = new File(basePath.toString, fileName)
-    filePath.exists
+    val fileNameHDFS = if (isSnapshot) s"$version.snapshot" else 
s"$version.delta"
+    val filePathHDFS = new File(basePath.toString, fileNameHDFS)
+    val fileNameRocks = if (isSnapshot) s"$version.zip" else 
s"$version.changelog"
+    val filePathRocks = new File(basePath.toString, fileNameRocks)
+    filePathHDFS.exists || filePathRocks.exists
   }
 
   def updateVersionTo(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to