This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b1346b401d6 [SPARK-51397][SS] Fix maintenance pool shutdown handling 
issue causing long test times
7b1346b401d6 is described below

commit 7b1346b401d6769c9e2a8ad9262e39754fe1d132
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Fri Mar 14 13:52:27 2025 +0900

    [SPARK-51397][SS] Fix maintenance pool shutdown handling issue causing long 
test times
    
    ### What changes were proposed in this pull request?
    Fix maintenance pool shutdown handling issue causing long test times
    
    ### Why are the changes needed?
    Some of the snapshot lag verification tests were taking a long time. This 
was because of the maintenance thread pool shutdown getting stuck due to a 
driver RPC with the coordinator which could already be destroyed.
    
    ```
    Exception in thread "state-store-maintenance-thread-3" Exception in thread 
"state-store-maintenance-thread-0" java.lang.InterruptedException
            at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1081)
            at 
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248)
            at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
            at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
            at 
org.apache.spark.util.SparkThreadUtils$.awaitResultNoSparkExceptionConversion(SparkThreadUtils.scala:60)
            at 
org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:45)
            at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:519)
            at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
            at 
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:107)
            at 
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
            at 
org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.verifyIfInstanceActive(StateStoreCoordinator.scala:161)
            at 
org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$verifyIfStoreInstanceActive$1(StateStore.scala:1293)
            at 
org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$verifyIfStoreInstanceActive$1$adapted(StateStore.scala:1293)
            at scala.Option.map(Option.scala:230)
    ```
    
    Before the change: > 6m
    
    ```
    23:40:57.292 WARN org.apache.spark.scheduler.DAGScheduler: Failed to cancel 
job group 6f26b37e-1743-45e1-ab42-d85c5b0c6ded. Cannot find active jobs for it.
    23:40:57.295 WARN org.apache.spark.scheduler.DAGScheduler: Failed to cancel 
job group 6f26b37e-1743-45e1-ab42-d85c5b0c6ded. Cannot find active jobs for it.
    [info] *** Test still running after 4 minutes, 58 seconds: suite name: 
RocksDBStateStoreIntegrationSuite, test name: SPARK-51097: Verify snapshot lag 
metrics are updated correctly with RocksDBStateStoreProvider (with changelog 
checkpointing).
    ```
    
    After the change: ~21s
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50168 from anishshri-db/task/SPARK-51397.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../scala/org/apache/spark/sql/internal/SQLConf.scala     |  9 +++++++++
 .../spark/sql/execution/streaming/state/StateStore.scala  | 15 ++++++++++-----
 .../sql/execution/streaming/state/StateStoreConf.scala    |  5 +++++
 3 files changed, 24 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6216960b2788..ef213f115aa3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2209,6 +2209,13 @@ object SQLConf {
       .checkValue(_ > 0, "Must be greater than 0")
       .createWithDefault(Math.max(Runtime.getRuntime.availableProcessors() / 
4, 1))
 
+  val STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT =
+    buildConf("spark.sql.streaming.stateStore.maintenanceShutdownTimeout")
+      .internal()
+      .doc("Timeout in seconds for maintenance pool operations to complete on 
shutdown")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefault(300L)
+
   val STATE_SCHEMA_CHECK_ENABLED =
     buildConf("spark.sql.streaming.stateStore.stateSchemaCheck")
       .doc("When true, Spark will validate the state schema against schema on 
existing state and " +
@@ -5768,6 +5775,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def numStateStoreMaintenanceThreads: Int = 
getConf(NUM_STATE_STORE_MAINTENANCE_THREADS)
 
+  def stateStoreMaintenanceShutdownTimeout: Long = 
getConf(STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT)
+
   def stateStoreMinDeltasForSnapshot: Int = 
getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
 
   def stateStoreFormatValidationEnabled: Boolean = 
getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 8ba3fc37162c..928e2b0e9b99 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -745,7 +745,9 @@ object StateStore extends Logging {
    * Thread Pool that runs maintenance on partitions that are scheduled by
    * MaintenanceTask periodically
    */
-  class MaintenanceThreadPool(numThreads: Int) {
+  class MaintenanceThreadPool(
+      numThreads: Int,
+      shutdownTimeout: Long) {
     private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
       numThreads, "state-store-maintenance-thread")
 
@@ -758,10 +760,11 @@ object StateStore extends Logging {
       threadPool.shutdown() // Disable new tasks from being submitted
 
       // Wait a while for existing tasks to terminate
-      if (!threadPool.awaitTermination(5 * 60, TimeUnit.SECONDS)) {
+      if (!threadPool.awaitTermination(shutdownTimeout, TimeUnit.SECONDS)) {
         logWarning(
-          s"MaintenanceThreadPool is not able to be terminated within 300 
seconds," +
-            " forcefully shutting down now.")
+          log"MaintenanceThreadPool failed to terminate within " +
+          log"waitTimeout=${MDC(LogKeys.TIMEOUT, shutdownTimeout)} seconds, " +
+          log"forcefully shutting down now.")
         threadPool.shutdownNow() // Cancel currently executing tasks
 
         // Wait a while for tasks to respond to being cancelled
@@ -917,13 +920,15 @@ object StateStore extends Logging {
   /** Start the periodic maintenance task if not already started and if Spark 
active */
   private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = {
     val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads
+    val maintenanceShutdownTimeout = 
storeConf.stateStoreMaintenanceShutdownTimeout
     loadedProviders.synchronized {
       if (SparkEnv.get != null && !isMaintenanceRunning) {
         maintenanceTask = new MaintenanceTask(
           storeConf.maintenanceInterval,
           task = { doMaintenance() }
         )
-        maintenanceThreadPool = new 
MaintenanceThreadPool(numMaintenanceThreads)
+        maintenanceThreadPool = new 
MaintenanceThreadPool(numMaintenanceThreads,
+          maintenanceShutdownTimeout)
         logInfo("State Store maintenance task started")
       }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 9d26bf8fdf2e..807534ee4569 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -33,6 +33,11 @@ class StateStoreConf(
    */
   val numStateStoreMaintenanceThreads: Int = 
sqlConf.numStateStoreMaintenanceThreads
 
+  /**
+   * Timeout for state store maintenance operations to complete on shutdown
+   */
+  val stateStoreMaintenanceShutdownTimeout: Long = 
sqlConf.stateStoreMaintenanceShutdownTimeout
+
   /**
    * Minimum number of delta files in a chain after which HDFSBackedStateStore 
will
    * consider generating a snapshot.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to