This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7b1346b401d6 [SPARK-51397][SS] Fix maintenance pool shutdown handling issue causing long test times 7b1346b401d6 is described below commit 7b1346b401d6769c9e2a8ad9262e39754fe1d132 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Fri Mar 14 13:52:27 2025 +0900 [SPARK-51397][SS] Fix maintenance pool shutdown handling issue causing long test times ### What changes were proposed in this pull request? Fix maintenance pool shutdown handling issue causing long test times ### Why are the changes needed? Some of the snapshot lag verification tests were taking a long time. This was because of the maintenance thread pool shutdown getting stuck due to a driver RPC with the coordinator which could already be destroyed. ``` Exception in thread "state-store-maintenance-thread-3" Exception in thread "state-store-maintenance-thread-0" java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1081) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) at org.apache.spark.util.SparkThreadUtils$.awaitResultNoSparkExceptionConversion(SparkThreadUtils.scala:60) at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:45) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:519) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:107) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91) at org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.verifyIfInstanceActive(StateStoreCoordinator.scala:161) at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$verifyIfStoreInstanceActive$1(StateStore.scala:1293) at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$verifyIfStoreInstanceActive$1$adapted(StateStore.scala:1293) at scala.Option.map(Option.scala:230) ``` Before the change: > 6m ``` 23:40:57.292 WARN org.apache.spark.scheduler.DAGScheduler: Failed to cancel job group 6f26b37e-1743-45e1-ab42-d85c5b0c6ded. Cannot find active jobs for it. 23:40:57.295 WARN org.apache.spark.scheduler.DAGScheduler: Failed to cancel job group 6f26b37e-1743-45e1-ab42-d85c5b0c6ded. Cannot find active jobs for it. [info] *** Test still running after 4 minutes, 58 seconds: suite name: RocksDBStateStoreIntegrationSuite, test name: SPARK-51097: Verify snapshot lag metrics are updated correctly with RocksDBStateStoreProvider (with changelog checkpointing). ``` After the change: ~21s ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50168 from anishshri-db/task/SPARK-51397. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ .../spark/sql/execution/streaming/state/StateStore.scala | 15 ++++++++++----- .../sql/execution/streaming/state/StateStoreConf.scala | 5 +++++ 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6216960b2788..ef213f115aa3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2209,6 +2209,13 @@ object SQLConf { .checkValue(_ > 0, "Must be greater than 0") .createWithDefault(Math.max(Runtime.getRuntime.availableProcessors() / 4, 1)) + val STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT = + buildConf("spark.sql.streaming.stateStore.maintenanceShutdownTimeout") + .internal() + .doc("Timeout in seconds for maintenance pool operations to complete on shutdown") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(300L) + val STATE_SCHEMA_CHECK_ENABLED = buildConf("spark.sql.streaming.stateStore.stateSchemaCheck") .doc("When true, Spark will validate the state schema against schema on existing state and " + @@ -5768,6 +5775,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def numStateStoreMaintenanceThreads: Int = getConf(NUM_STATE_STORE_MAINTENANCE_THREADS) + def stateStoreMaintenanceShutdownTimeout: Long = getConf(STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT) + def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) def stateStoreFormatValidationEnabled: Boolean = getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 8ba3fc37162c..928e2b0e9b99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -745,7 +745,9 @@ object StateStore extends Logging { * Thread Pool that runs maintenance on partitions that are scheduled by * MaintenanceTask periodically */ - class MaintenanceThreadPool(numThreads: Int) { + class MaintenanceThreadPool( + numThreads: Int, + shutdownTimeout: Long) { private val threadPool = ThreadUtils.newDaemonFixedThreadPool( numThreads, "state-store-maintenance-thread") @@ -758,10 +760,11 @@ object StateStore extends Logging { threadPool.shutdown() // Disable new tasks from being submitted // Wait a while for existing tasks to terminate - if (!threadPool.awaitTermination(5 * 60, TimeUnit.SECONDS)) { + if (!threadPool.awaitTermination(shutdownTimeout, TimeUnit.SECONDS)) { logWarning( - s"MaintenanceThreadPool is not able to be terminated within 300 seconds," + - " forcefully shutting down now.") + log"MaintenanceThreadPool failed to terminate within " + + log"waitTimeout=${MDC(LogKeys.TIMEOUT, shutdownTimeout)} seconds, " + + log"forcefully shutting down now.") threadPool.shutdownNow() // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled @@ -917,13 +920,15 @@ object StateStore extends Logging { /** Start the periodic maintenance task if not already started and if Spark active */ private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = { val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads + val maintenanceShutdownTimeout = storeConf.stateStoreMaintenanceShutdownTimeout loadedProviders.synchronized { if (SparkEnv.get != null && !isMaintenanceRunning) { maintenanceTask = new MaintenanceTask( storeConf.maintenanceInterval, task = { doMaintenance() } ) - maintenanceThreadPool = new MaintenanceThreadPool(numMaintenanceThreads) + maintenanceThreadPool = new MaintenanceThreadPool(numMaintenanceThreads, + maintenanceShutdownTimeout) logInfo("State Store maintenance task started") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index 9d26bf8fdf2e..807534ee4569 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -33,6 +33,11 @@ class StateStoreConf( */ val numStateStoreMaintenanceThreads: Int = sqlConf.numStateStoreMaintenanceThreads + /** + * Timeout for state store maintenance operations to complete on shutdown + */ + val stateStoreMaintenanceShutdownTimeout: Long = sqlConf.stateStoreMaintenanceShutdownTimeout + /** * Minimum number of delta files in a chain after which HDFSBackedStateStore will * consider generating a snapshot. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org