This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new de7ba3b2a8b6 [SPARK-53631][CORE] Optimize memory and perf on SHS 
bootstrap
de7ba3b2a8b6 is described below

commit de7ba3b2a8b6c9e4d6b697439243ae06623c622c
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Sep 23 21:34:13 2025 -0700

    [SPARK-53631][CORE] Optimize memory and perf on SHS bootstrap
    
    ### What changes were proposed in this pull request?
    
    Core ideas:
    
    1. Change the log replay thread pool to have a bounded queue, and block 
task submission when the queue is full.
    
       Currently, the log replay thread pool uses an unbounded queue, when 
there are a large number (e.g., millions) of event logs under 
`spark.history.fs.logDirectory`, all tasks will be queued at the thread pool 
queue without blocking the scanning thread, and in the next schedule, enqueue 
again ...
    
       
https://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size
    
    2. Move log compaction to a dedicated thread pool.
    
       Replaying and compaction are different types of workloads, isolating 
them from each other could improve the resource utilization.
    
    ### Why are the changes needed?
    
    Improve performance and reduce memory usage on the SHS bootstrap with empty 
KV cache, when there are tons of event logs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No functionality changes, but brings a new config 
`spark.history.fs.numCompactThreads`
    
    ### How was this patch tested?
    
    Tested on an internal cluster, starting SHS with an empty 
`spark.history.store.path` and ~650k event logs under 
`spark.history.fs.logDirectory`, the related configs are
    
    ```
    spark.history.fs.cleaner.maxNum 650000
    spark.history.fs.logDirectory hdfs://foo/spark2-history
    spark.history.fs.update.interval 5s
    spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
    spark.history.store.maxDiskUsage 100GB
    spark.history.store.path /foo/bar/historyStore
    spark.history.fs.numReplayThreads 64
    spark.history.fs.numCompactThreads 4
    spark.history.store.hybridStore.enabled true
    spark.history.store.hybridStore.maxMemoryUsage 16g
    spark.history.store.hybridStore.diskBackend ROCKSDB
    ```
    - `spark.history.store.path` is configured to an HDD path
    - we disable `spark.eventLog.rolling.enabled` so `numCompactThreads` has no 
heavy work
    
    It's much faster than before, and metrics show better CPU utilization and 
lower memory usage.
    
    <img width="2546" height="480" alt="bf51f797a11527ce82036669f96cf50b" 
src="https://github.com/user-attachments/assets/4db521b0-cf1c-4b93-a06d-27fdaf1ccec4";
 />
    (before vs. after, the 3rd figure is "memory used")
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52382 from pan3793/SPARK-53631.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../java/org/apache/spark/internal/LogKeys.java    |   1 +
 .../spark/deploy/history/FsHistoryProvider.scala   |  39 +++++--
 .../org/apache/spark/internal/config/History.scala |   6 +
 .../util/BlockingThreadPoolExecutorService.scala   | 124 +++++++++++++++++++++
 .../scala/org/apache/spark/util/ThreadUtils.scala  |  17 +++
 .../org/apache/spark/util/ThreadUtilsSuite.scala   |  34 ++++++
 docs/monitoring.md                                 |   8 ++
 7 files changed, 218 insertions(+), 11 deletions(-)

diff --git 
a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java 
b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
index 0fd3627bbac1..e90683a20575 100644
--- a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
+++ b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
@@ -328,6 +328,7 @@ public enum LogKeys implements LogKey {
   LAST_ACCESS_TIME,
   LAST_COMMITTED_CHECKPOINT_ID,
   LAST_COMMIT_BASED_CHECKPOINT_ID,
+  LAST_SCAN_TIME,
   LAST_VALID_TIME,
   LATEST_BATCH_ID,
   LATEST_COMMITTED_BATCH_ID,
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 0c6d6acf66c8..4863291b529b 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -102,7 +102,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)
 
   // Number of threads used to replay event logs.
-  private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)
+  private val numReplayThreads = conf.get(History.NUM_REPLAY_THREADS)
+  // Number of threads used to compact rolling event logs.
+  private val numCompactThreads = conf.get(History.NUM_COMPACT_THREADS)
 
   private val logDir = conf.get(History.HISTORY_LOG_DIR)
 
@@ -209,7 +211,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
    */
   private val replayExecutor: ExecutorService = {
     if (!Utils.isTesting) {
-      ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, 
"log-replay-executor")
+      ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
+        numReplayThreads, 1024, "log-replay-executor")
+    } else {
+      ThreadUtils.sameThreadExecutorService()
+    }
+  }
+
+  /**
+   * Fixed size thread pool to compact log files.
+   */
+  private val compactExecutor: ExecutorService = {
+    if (!Utils.isTesting) {
+      ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
+        numCompactThreads, 1024, "log-compact-executor")
     } else {
       ThreadUtils.sameThreadExecutorService()
     }
@@ -431,7 +446,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         initThread.interrupt()
         initThread.join()
       }
-      Seq(pool, replayExecutor).foreach { executor =>
+      Seq(pool, replayExecutor, compactExecutor).foreach { executor =>
         executor.shutdown()
         if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
           executor.shutdownNow()
@@ -487,7 +502,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     var count: Int = 0
     try {
       val newLastScanTime = clock.getTimeMillis()
-      logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
+      logInfo(log"Scanning ${MDC(HISTORY_DIR, logDir)} with " +
+        log"lastScanTime=${MDC(LAST_SCAN_TIME, lastScanTime)}")
 
       // Mark entries that are processing as not stale. Such entries do not 
have a chance to be
       // updated with the new 'lastProcessed' time and thus any entity that 
completes processing
@@ -495,7 +511,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       // and will be deleted from the UI until the next 'checkForLogs' run.
       val notStale = mutable.HashSet[String]()
       val updated = Option(fs.listStatus(new Path(logDir)))
-        .map(_.toImmutableArraySeq).getOrElse(Nil)
+        .map(_.toImmutableArraySeq).getOrElse(Seq.empty)
         .filter { entry => isAccessible(entry.getPath) }
         .filter { entry =>
           if (isProcessing(entry.getPath)) {
@@ -612,11 +628,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         }
 
       if (updated.nonEmpty) {
-        logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.rootPath)}")
+        logInfo(log"New/updated attempts found: ${MDC(NUM_ATTEMPT, 
updated.size)}")
       }
 
       updated.foreach { entry =>
-        submitLogProcessTask(entry.rootPath) { () =>
+        submitLogProcessTask(entry.rootPath, replayExecutor) { () =>
           mergeApplicationListing(entry, newLastScanTime, true)
         }
       }
@@ -788,7 +804,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
       // triggering another task for compaction task only if it succeeds
       if (succeeded) {
-        submitLogProcessTask(rootPath) { () => compact(reader) }
+        submitLogProcessTask(rootPath, compactExecutor) { () => 
compact(reader) }
       }
     }
   }
@@ -1456,13 +1472,14 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   }
 
   /** NOTE: 'task' should ensure it executes 'endProcessing' at the end */
-  private def submitLogProcessTask(rootPath: Path)(task: Runnable): Unit = {
+  private def submitLogProcessTask(
+      rootPath: Path, pool: ExecutorService)(task: Runnable): Unit = {
     try {
       processing(rootPath)
-      replayExecutor.submit(task)
+      pool.submit(task)
     } catch {
       // let the iteration over the updated entries break, since an exception 
on
-      // replayExecutor.submit (..) indicates the ExecutorService is unable
+      // pool.submit (..) indicates the ExecutorService is unable
       // to take any more submissions at this time
       case e: Exception =>
         logError(s"Exception while submitting task", e)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala 
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index 8eaa37cceee9..d44d3c32dfef 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -228,6 +228,12 @@ private[spark] object History {
     .intConf
     .createWithDefaultFunction(() => 
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
 
+  val NUM_COMPACT_THREADS = ConfigBuilder("spark.history.fs.numCompactThreads")
+    .version("4.1.0")
+    .doc("Number of threads that will be used by history server to compact 
event logs.")
+    .intConf
+    .createWithDefaultFunction(() => 
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+
   val RETAINED_APPLICATIONS = 
ConfigBuilder("spark.history.retainedApplications")
     .version("1.0.0")
     .doc("The number of applications to retain UI data for in the cache. If 
this cap is " +
diff --git 
a/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala
 
b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala
new file mode 100644
index 000000000000..506a83f5590e
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util
+import java.util.concurrent._
+
+import com.google.common.util.concurrent.Futures
+
+// scalastyle:off
+/**
+ * This thread pool executor throttles the submission of new tasks by using a 
semaphore.
+ * Task submissions require permits, task completions release permits.
+ * <p>
+ * NOTE: [[invoke*]] methods are not supported, you should either use the 
[[submit]] methods
+ * or the [[execute]] method.
+ * <p>
+ * This is inspired by
+ * <a 
href="https://github.com/apache/incubator-retired-s4/blob/0.6.0-Final/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java";>
+ * Apache S4 BlockingThreadPoolExecutorService</a>
+ */
+// scalastyle:on
+private[spark] class BlockingThreadPoolExecutorService(
+    nThreads: Int, workQueueSize: Int, threadFactory: ThreadFactory)
+  extends ExecutorService {
+
+  private val permits = new Semaphore(nThreads + workQueueSize)
+
+  private val workQuque = new LinkedBlockingQueue[Runnable](nThreads + 
workQueueSize)
+
+  private val delegate = new ThreadPoolExecutor(
+    nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, workQuque, threadFactory)
+
+  override def shutdown(): Unit = delegate.shutdown()
+
+  override def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
+
+  override def isShutdown: Boolean = delegate.isShutdown
+
+  override def isTerminated: Boolean = delegate.isTerminated
+
+  override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean =
+    delegate.awaitTermination(timeout, unit)
+
+  override def submit[T](task: Callable[T]): Future[T] = {
+    try permits.acquire() catch {
+      case e: InterruptedException =>
+        Thread.currentThread.interrupt()
+        return Futures.immediateFailedFuture(e)
+    }
+    delegate.submit(new CallableWithPermitRelease(task))
+  }
+
+  override def submit[T](task: Runnable, result: T): Future[T] = {
+    try permits.acquire() catch {
+      case e: InterruptedException =>
+        Thread.currentThread.interrupt()
+        return Futures.immediateFailedFuture(e)
+    }
+    delegate.submit(new RunnableWithPermitRelease(task), result)
+  }
+
+  override def submit(task: Runnable): Future[_] = {
+    try permits.acquire() catch {
+      case e: InterruptedException =>
+        Thread.currentThread.interrupt()
+        return Futures.immediateFailedFuture(e)
+    }
+    delegate.submit(new RunnableWithPermitRelease(task))
+  }
+
+  override def execute(command: Runnable): Unit = {
+    try permits.acquire() catch {
+      case _: InterruptedException =>
+        Thread.currentThread.interrupt()
+    }
+    delegate.execute(new RunnableWithPermitRelease(command))
+  }
+
+  override def invokeAll[T](
+      tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] =
+    throw new UnsupportedOperationException("Not implemented")
+
+  override def invokeAll[T](
+      tasks: util.Collection[_ <: Callable[T]],
+      timeout: Long, unit: TimeUnit): util.List[Future[T]] =
+    throw new UnsupportedOperationException("Not implemented")
+
+  override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T =
+    throw new UnsupportedOperationException("Not implemented")
+
+  override def invokeAny[T](
+      tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: 
TimeUnit): T =
+    throw new UnsupportedOperationException("Not implemented")
+
+  /**
+   * Releases a permit after the task is executed.
+   */
+  private class RunnableWithPermitRelease(delegate: Runnable) extends Runnable 
{
+    override def run(): Unit = try delegate.run() finally permits.release()
+  }
+
+  /**
+   * Releases a permit after the task is completed.
+   */
+  private class CallableWithPermitRelease[T](delegate: Callable[T]) extends 
Callable[T] {
+    override def call(): T = try delegate.call() finally permits.release()
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index e9d14f904db4..d22e14d99265 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -188,6 +188,23 @@ private[spark] object ThreadUtils {
       rejectedExecutionHandler)
   }
 
+  /**
+   * Simliar to newDaemonFixedThreadPool, but with a bound workQueue, task 
submission will
+   * be blocked when queue is full.
+   *
+   * @param nThreads the number of threads in the pool
+   * @param workQueueSize the capacity of the queue to use for holding tasks 
before they are
+   *                      executed. Task submission will be blocked when queue 
is full.
+   * @param prefix thread names are formatted as prefix-ID, where ID is a 
unique, sequentially
+   *               assigned integer.
+   * @return BlockingThreadPoolExecutorService
+   */
+  def newDaemonBlockingThreadPoolExecutorService(
+      nThreads: Int, workQueueSize: Int, prefix: String): ExecutorService = {
+    val threadFactory = namedThreadFactory(prefix)
+    new BlockingThreadPoolExecutorService(nThreads, workQueueSize, 
threadFactory)
+  }
+
   /**
    * Wrapper over ScheduledThreadPoolExecutor the pool with daemon threads.
    */
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 04f661db691e..d74bc2699944 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -98,6 +98,40 @@ class ThreadUtilsSuite extends SparkFunSuite {
     }
   }
 
+  test("newDaemonBlockingThreadPoolExecutorService") {
+    val nThread = 3
+    val workQueueSize = 5
+    val submithreadsLatch = new CountDownLatch(nThread + workQueueSize + 1)
+    val latch = new CountDownLatch(1)
+    val blockingPool = ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
+      nThread, workQueueSize, 
"ThreadUtilsSuite-newDaemonBlockingThreadPoolExecutorService")
+
+    try {
+      val submitThread = new Thread(() => {
+        (0 until nThread + workQueueSize + 1).foreach { i =>
+          blockingPool.execute(() => {
+            latch.await(10, TimeUnit.SECONDS)
+          })
+          submithreadsLatch.countDown()
+        }
+      })
+      submitThread.setDaemon(true)
+      submitThread.start()
+
+      // the last one task submission will be blocked until previous tasks 
completed
+      eventually(timeout(10.seconds)) {
+        assert(submithreadsLatch.getCount === 1L)
+      }
+      latch.countDown()
+      eventually(timeout(10.seconds)) {
+        assert(submithreadsLatch.getCount === 0L)
+        assert(!submitThread.isAlive)
+      }
+    } finally {
+      blockingPool.shutdownNow()
+    }
+  }
+
   test("sameThread") {
     val callerThreadName = Thread.currentThread().getName()
     val f = Future {
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 957ee555191a..49d04b328f29 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -329,6 +329,14 @@ Security options for the Spark History Server are covered 
more detail in the
     </td>
     <td>2.0.0</td>
   </tr>
+  <tr>
+    <td>spark.history.fs.numCompactThreads</td>
+    <td>25% of available cores</td>
+    <td>
+      Number of threads that will be used by history server to compact event 
logs.
+    </td>
+    <td>4.1.0</td>
+  </tr>
   <tr>
     <td>spark.history.store.maxDiskUsage</td>
     <td>10g</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to