This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new de7ba3b2a8b6 [SPARK-53631][CORE] Optimize memory and perf on SHS
bootstrap
de7ba3b2a8b6 is described below
commit de7ba3b2a8b6c9e4d6b697439243ae06623c622c
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Sep 23 21:34:13 2025 -0700
[SPARK-53631][CORE] Optimize memory and perf on SHS bootstrap
### What changes were proposed in this pull request?
Core ideas:
1. Change the log replay thread pool to have a bounded queue, and block
task submission when the queue is full.
Currently, the log replay thread pool uses an unbounded queue, when
there are a large number (e.g., millions) of event logs under
`spark.history.fs.logDirectory`, all tasks will be queued at the thread pool
queue without blocking the scanning thread, and in the next schedule, enqueue
again ...
https://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size
2. Move log compaction to a dedicated thread pool.
Replaying and compaction are different types of workloads, isolating
them from each other could improve the resource utilization.
### Why are the changes needed?
Improve performance and reduce memory usage on the SHS bootstrap with empty
KV cache, when there are tons of event logs.
### Does this PR introduce _any_ user-facing change?
No functionality changes, but brings a new config
`spark.history.fs.numCompactThreads`
### How was this patch tested?
Tested on an internal cluster, starting SHS with an empty
`spark.history.store.path` and ~650k event logs under
`spark.history.fs.logDirectory`, the related configs are
```
spark.history.fs.cleaner.maxNum 650000
spark.history.fs.logDirectory hdfs://foo/spark2-history
spark.history.fs.update.interval 5s
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.store.maxDiskUsage 100GB
spark.history.store.path /foo/bar/historyStore
spark.history.fs.numReplayThreads 64
spark.history.fs.numCompactThreads 4
spark.history.store.hybridStore.enabled true
spark.history.store.hybridStore.maxMemoryUsage 16g
spark.history.store.hybridStore.diskBackend ROCKSDB
```
- `spark.history.store.path` is configured to an HDD path
- we disable `spark.eventLog.rolling.enabled` so `numCompactThreads` has no
heavy work
It's much faster than before, and metrics show better CPU utilization and
lower memory usage.
<img width="2546" height="480" alt="bf51f797a11527ce82036669f96cf50b"
src="https://github.com/user-attachments/assets/4db521b0-cf1c-4b93-a06d-27fdaf1ccec4"
/>
(before vs. after, the 3rd figure is "memory used")
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52382 from pan3793/SPARK-53631.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../java/org/apache/spark/internal/LogKeys.java | 1 +
.../spark/deploy/history/FsHistoryProvider.scala | 39 +++++--
.../org/apache/spark/internal/config/History.scala | 6 +
.../util/BlockingThreadPoolExecutorService.scala | 124 +++++++++++++++++++++
.../scala/org/apache/spark/util/ThreadUtils.scala | 17 +++
.../org/apache/spark/util/ThreadUtilsSuite.scala | 34 ++++++
docs/monitoring.md | 8 ++
7 files changed, 218 insertions(+), 11 deletions(-)
diff --git
a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
index 0fd3627bbac1..e90683a20575 100644
--- a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
+++ b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
@@ -328,6 +328,7 @@ public enum LogKeys implements LogKey {
LAST_ACCESS_TIME,
LAST_COMMITTED_CHECKPOINT_ID,
LAST_COMMIT_BASED_CHECKPOINT_ID,
+ LAST_SCAN_TIME,
LAST_VALID_TIME,
LATEST_BATCH_ID,
LATEST_COMMITTED_BATCH_ID,
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 0c6d6acf66c8..4863291b529b 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -102,7 +102,9 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)
// Number of threads used to replay event logs.
- private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)
+ private val numReplayThreads = conf.get(History.NUM_REPLAY_THREADS)
+ // Number of threads used to compact rolling event logs.
+ private val numCompactThreads = conf.get(History.NUM_COMPACT_THREADS)
private val logDir = conf.get(History.HISTORY_LOG_DIR)
@@ -209,7 +211,20 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
*/
private val replayExecutor: ExecutorService = {
if (!Utils.isTesting) {
- ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS,
"log-replay-executor")
+ ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
+ numReplayThreads, 1024, "log-replay-executor")
+ } else {
+ ThreadUtils.sameThreadExecutorService()
+ }
+ }
+
+ /**
+ * Fixed size thread pool to compact log files.
+ */
+ private val compactExecutor: ExecutorService = {
+ if (!Utils.isTesting) {
+ ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
+ numCompactThreads, 1024, "log-compact-executor")
} else {
ThreadUtils.sameThreadExecutorService()
}
@@ -431,7 +446,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
initThread.interrupt()
initThread.join()
}
- Seq(pool, replayExecutor).foreach { executor =>
+ Seq(pool, replayExecutor, compactExecutor).foreach { executor =>
executor.shutdown()
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow()
@@ -487,7 +502,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
var count: Int = 0
try {
val newLastScanTime = clock.getTimeMillis()
- logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
+ logInfo(log"Scanning ${MDC(HISTORY_DIR, logDir)} with " +
+ log"lastScanTime=${MDC(LAST_SCAN_TIME, lastScanTime)}")
// Mark entries that are processing as not stale. Such entries do not
have a chance to be
// updated with the new 'lastProcessed' time and thus any entity that
completes processing
@@ -495,7 +511,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
// and will be deleted from the UI until the next 'checkForLogs' run.
val notStale = mutable.HashSet[String]()
val updated = Option(fs.listStatus(new Path(logDir)))
- .map(_.toImmutableArraySeq).getOrElse(Nil)
+ .map(_.toImmutableArraySeq).getOrElse(Seq.empty)
.filter { entry => isAccessible(entry.getPath) }
.filter { entry =>
if (isProcessing(entry.getPath)) {
@@ -612,11 +628,11 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
}
if (updated.nonEmpty) {
- logDebug(s"New/updated attempts found: ${updated.size}
${updated.map(_.rootPath)}")
+ logInfo(log"New/updated attempts found: ${MDC(NUM_ATTEMPT,
updated.size)}")
}
updated.foreach { entry =>
- submitLogProcessTask(entry.rootPath) { () =>
+ submitLogProcessTask(entry.rootPath, replayExecutor) { () =>
mergeApplicationListing(entry, newLastScanTime, true)
}
}
@@ -788,7 +804,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
// triggering another task for compaction task only if it succeeds
if (succeeded) {
- submitLogProcessTask(rootPath) { () => compact(reader) }
+ submitLogProcessTask(rootPath, compactExecutor) { () =>
compact(reader) }
}
}
}
@@ -1456,13 +1472,14 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
}
/** NOTE: 'task' should ensure it executes 'endProcessing' at the end */
- private def submitLogProcessTask(rootPath: Path)(task: Runnable): Unit = {
+ private def submitLogProcessTask(
+ rootPath: Path, pool: ExecutorService)(task: Runnable): Unit = {
try {
processing(rootPath)
- replayExecutor.submit(task)
+ pool.submit(task)
} catch {
// let the iteration over the updated entries break, since an exception
on
- // replayExecutor.submit (..) indicates the ExecutorService is unable
+ // pool.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting task", e)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index 8eaa37cceee9..d44d3c32dfef 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -228,6 +228,12 @@ private[spark] object History {
.intConf
.createWithDefaultFunction(() =>
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+ val NUM_COMPACT_THREADS = ConfigBuilder("spark.history.fs.numCompactThreads")
+ .version("4.1.0")
+ .doc("Number of threads that will be used by history server to compact
event logs.")
+ .intConf
+ .createWithDefaultFunction(() =>
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+
val RETAINED_APPLICATIONS =
ConfigBuilder("spark.history.retainedApplications")
.version("1.0.0")
.doc("The number of applications to retain UI data for in the cache. If
this cap is " +
diff --git
a/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala
b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala
new file mode 100644
index 000000000000..506a83f5590e
--- /dev/null
+++
b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util
+import java.util.concurrent._
+
+import com.google.common.util.concurrent.Futures
+
+// scalastyle:off
+/**
+ * This thread pool executor throttles the submission of new tasks by using a
semaphore.
+ * Task submissions require permits, task completions release permits.
+ * <p>
+ * NOTE: [[invoke*]] methods are not supported, you should either use the
[[submit]] methods
+ * or the [[execute]] method.
+ * <p>
+ * This is inspired by
+ * <a
href="https://github.com/apache/incubator-retired-s4/blob/0.6.0-Final/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
+ * Apache S4 BlockingThreadPoolExecutorService</a>
+ */
+// scalastyle:on
+private[spark] class BlockingThreadPoolExecutorService(
+ nThreads: Int, workQueueSize: Int, threadFactory: ThreadFactory)
+ extends ExecutorService {
+
+ private val permits = new Semaphore(nThreads + workQueueSize)
+
+ private val workQuque = new LinkedBlockingQueue[Runnable](nThreads +
workQueueSize)
+
+ private val delegate = new ThreadPoolExecutor(
+ nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, workQuque, threadFactory)
+
+ override def shutdown(): Unit = delegate.shutdown()
+
+ override def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
+
+ override def isShutdown: Boolean = delegate.isShutdown
+
+ override def isTerminated: Boolean = delegate.isTerminated
+
+ override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean =
+ delegate.awaitTermination(timeout, unit)
+
+ override def submit[T](task: Callable[T]): Future[T] = {
+ try permits.acquire() catch {
+ case e: InterruptedException =>
+ Thread.currentThread.interrupt()
+ return Futures.immediateFailedFuture(e)
+ }
+ delegate.submit(new CallableWithPermitRelease(task))
+ }
+
+ override def submit[T](task: Runnable, result: T): Future[T] = {
+ try permits.acquire() catch {
+ case e: InterruptedException =>
+ Thread.currentThread.interrupt()
+ return Futures.immediateFailedFuture(e)
+ }
+ delegate.submit(new RunnableWithPermitRelease(task), result)
+ }
+
+ override def submit(task: Runnable): Future[_] = {
+ try permits.acquire() catch {
+ case e: InterruptedException =>
+ Thread.currentThread.interrupt()
+ return Futures.immediateFailedFuture(e)
+ }
+ delegate.submit(new RunnableWithPermitRelease(task))
+ }
+
+ override def execute(command: Runnable): Unit = {
+ try permits.acquire() catch {
+ case _: InterruptedException =>
+ Thread.currentThread.interrupt()
+ }
+ delegate.execute(new RunnableWithPermitRelease(command))
+ }
+
+ override def invokeAll[T](
+ tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] =
+ throw new UnsupportedOperationException("Not implemented")
+
+ override def invokeAll[T](
+ tasks: util.Collection[_ <: Callable[T]],
+ timeout: Long, unit: TimeUnit): util.List[Future[T]] =
+ throw new UnsupportedOperationException("Not implemented")
+
+ override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T =
+ throw new UnsupportedOperationException("Not implemented")
+
+ override def invokeAny[T](
+ tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit:
TimeUnit): T =
+ throw new UnsupportedOperationException("Not implemented")
+
+ /**
+ * Releases a permit after the task is executed.
+ */
+ private class RunnableWithPermitRelease(delegate: Runnable) extends Runnable
{
+ override def run(): Unit = try delegate.run() finally permits.release()
+ }
+
+ /**
+ * Releases a permit after the task is completed.
+ */
+ private class CallableWithPermitRelease[T](delegate: Callable[T]) extends
Callable[T] {
+ override def call(): T = try delegate.call() finally permits.release()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index e9d14f904db4..d22e14d99265 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -188,6 +188,23 @@ private[spark] object ThreadUtils {
rejectedExecutionHandler)
}
+ /**
+ * Simliar to newDaemonFixedThreadPool, but with a bound workQueue, task
submission will
+ * be blocked when queue is full.
+ *
+ * @param nThreads the number of threads in the pool
+ * @param workQueueSize the capacity of the queue to use for holding tasks
before they are
+ * executed. Task submission will be blocked when queue
is full.
+ * @param prefix thread names are formatted as prefix-ID, where ID is a
unique, sequentially
+ * assigned integer.
+ * @return BlockingThreadPoolExecutorService
+ */
+ def newDaemonBlockingThreadPoolExecutorService(
+ nThreads: Int, workQueueSize: Int, prefix: String): ExecutorService = {
+ val threadFactory = namedThreadFactory(prefix)
+ new BlockingThreadPoolExecutorService(nThreads, workQueueSize,
threadFactory)
+ }
+
/**
* Wrapper over ScheduledThreadPoolExecutor the pool with daemon threads.
*/
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 04f661db691e..d74bc2699944 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -98,6 +98,40 @@ class ThreadUtilsSuite extends SparkFunSuite {
}
}
+ test("newDaemonBlockingThreadPoolExecutorService") {
+ val nThread = 3
+ val workQueueSize = 5
+ val submithreadsLatch = new CountDownLatch(nThread + workQueueSize + 1)
+ val latch = new CountDownLatch(1)
+ val blockingPool = ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
+ nThread, workQueueSize,
"ThreadUtilsSuite-newDaemonBlockingThreadPoolExecutorService")
+
+ try {
+ val submitThread = new Thread(() => {
+ (0 until nThread + workQueueSize + 1).foreach { i =>
+ blockingPool.execute(() => {
+ latch.await(10, TimeUnit.SECONDS)
+ })
+ submithreadsLatch.countDown()
+ }
+ })
+ submitThread.setDaemon(true)
+ submitThread.start()
+
+ // the last one task submission will be blocked until previous tasks
completed
+ eventually(timeout(10.seconds)) {
+ assert(submithreadsLatch.getCount === 1L)
+ }
+ latch.countDown()
+ eventually(timeout(10.seconds)) {
+ assert(submithreadsLatch.getCount === 0L)
+ assert(!submitThread.isAlive)
+ }
+ } finally {
+ blockingPool.shutdownNow()
+ }
+ }
+
test("sameThread") {
val callerThreadName = Thread.currentThread().getName()
val f = Future {
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 957ee555191a..49d04b328f29 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -329,6 +329,14 @@ Security options for the Spark History Server are covered
more detail in the
</td>
<td>2.0.0</td>
</tr>
+ <tr>
+ <td>spark.history.fs.numCompactThreads</td>
+ <td>25% of available cores</td>
+ <td>
+ Number of threads that will be used by history server to compact event
logs.
+ </td>
+ <td>4.1.0</td>
+ </tr>
<tr>
<td>spark.history.store.maxDiskUsage</td>
<td>10g</td>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]