This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aeae9ff7bfbb [SPARK-52737][CORE] Pushdown predicate and number of apps to FsHistoryProvider when listing applications aeae9ff7bfbb is described below commit aeae9ff7bfbbef574c047dd4d25c1cdb8667da96 Author: Shardul Mahadik <smaha...@linkedin.com> AuthorDate: Sat Jul 26 19:21:03 2025 +0800 [SPARK-52737][CORE] Pushdown predicate and number of apps to FsHistoryProvider when listing applications ### What changes were proposed in this pull request? SPARK-38896 modified how applications are listed from the KVStore to close the KVStore iterator eagerly [Link](https://github.com/apache/spark/pull/36237/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R328). This meant that `FsHistoryProvider.getListing` now eagerly goes through every application in the KVStore before returning an iterator to the caller. In a couple of contexts where `FsHistoryProvider.getListing` is used, this is very detrimental. e.g. [he [...] To fix the issue, while preserving the original intent of closing the iterator early, this PR proposes pushing down filter predicates and number of applications required to FsHistoryProvider. ### Why are the changes needed? To fix a perf regression in SHS due to SPARK-38896 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests for `HistoryPage` and `ApplicationListResource` Tested performance on local SHS with a large number of apps (~75k) consistent with production. Before: ``` smahadiklocalhost [ ~ ]$ curl http://localhost:18080/api/v1/applications | jq 'length' 75061 smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: %{time_total}s\n" -o /dev/null http://localhost:18080; done Total time: 3.607995s Total time: 3.564875s Total time: 3.095895s Total time: 3.153576s Total time: 3.157186s Total time: 3.251107s Total time: 3.681727s Total time: 4.622074s Total time: 6.866931s Total time: 3.523224s smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: %{time_total}s\n" -o /dev/null http://localhost:18080/api/v1/applications?limit=10; done Total time: 3.340698s Total time: 3.206455s Total time: 3.140326s Total time: 4.704944s Total time: 3.982831s Total time: 7.375094s Total time: 3.328329s Total time: 3.264700s Total time: 3.283851s Total time: 3.456416s ``` After: ``` smahadiklocalhost [ ~ ]$ curl http://localhost:18080/api/v1/applications | jq 'length' % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 36.7M 0 36.7M 0 0 7662k 0 --:--:-- 0:00:04 --:--:-- 7663k 75077 smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: %{time_total}s\n" -o /dev/null http://localhost:18080; done Total time: 0.224714s Total time: 0.012205s Total time: 0.014709s Total time: 0.008092s Total time: 0.007284s Total time: 0.006350s Total time: 0.005414s Total time: 0.006391s Total time: 0.005668s Total time: 0.004738s smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: %{time_total}s\n" -o /dev/null http://localhost:18080/api/v1/applications?limit=10; done Total time: 1.439507s Total time: 0.015126s Total time: 0.009085s Total time: 0.007620s Total time: 0.007692s Total time: 0.007420s Total time: 0.007152s Total time: 0.010515s Total time: 0.011493s Total time: 0.007564s ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #51428 from shardulm94/smahadik/shs-slow. Authored-by: Shardul Mahadik <smaha...@linkedin.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../spark/deploy/history/ApplicationHistoryProvider.scala | 9 +++++++++ .../apache/spark/deploy/history/FsHistoryProvider.scala | 8 ++++++++ .../org/apache/spark/deploy/history/HistoryPage.scala | 2 +- .../org/apache/spark/deploy/history/HistoryServer.scala | 5 +++++ core/src/main/scala/org/apache/spark/status/KVUtils.scala | 14 ++++++++++++++ .../org/apache/spark/status/api/v1/ApiRootResource.scala | 4 ++++ .../spark/status/api/v1/ApplicationListResource.scala | 4 ++-- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 5 +++++ 8 files changed, 48 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index f3f7db6bb0ab..89f0d12935ce 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -99,6 +99,15 @@ private[history] abstract class ApplicationHistoryProvider { */ def getListing(): Iterator[ApplicationInfo] + /** + * Returns a list of applications available for the history server to show. + * + * @param max The maximum number of applications to return + * @param predicate A function that filters the applications to be returned + * @return An iterator of matching applications up to the specified maximum + */ + def getListing(max: Int)(predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] + /** * Returns the Spark UI for a specific application. * diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 4aa6c7e40c54..4365e9228072 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -307,6 +307,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .index("endTime").reverse())(_.toApplicationInfo()).iterator } + override def getListing(max: Int)( + predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = { + // Return the filtered listing in end time descending order. + KVUtils.mapToSeqWithFilter( + listing.view(classOf[ApplicationInfoWrapper]).index("endTime").reverse(), + max)(_.toApplicationInfo())(predicate).iterator + } + override def getApplicationInfo(appId: String): Option[ApplicationInfo] = { try { Some(load(appId).toApplicationInfo()) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index ff1629b69809..4eeddd7cc709 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -109,7 +109,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") } def shouldDisplayApplications(requestedIncomplete: Boolean): Boolean = { - parent.getApplicationList().exists(isApplicationCompleted(_) != requestedIncomplete) + parent.getApplicationInfoList(1)(isApplicationCompleted(_) != requestedIncomplete).nonEmpty } private def makePageLink(request: HttpServletRequest, showIncomplete: Boolean): String = { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index ce9f70c9a83e..aba66c319ca9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -224,6 +224,11 @@ class HistoryServer( getApplicationList() } + override def getApplicationInfoList(max: Int)( + filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = { + provider.getListing(max)(filter) + } + def getApplicationInfo(appId: String): Option[ApplicationInfo] = { provider.getApplicationInfo(appId) } diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index e334626413dc..49b77111abaf 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -213,6 +213,20 @@ private[spark] object KVUtils extends Logging { } } + /** + * Maps all values of KVStoreView to new values using a transformation function + * and filtered by a filter function. + */ + def mapToSeqWithFilter[T, B]( + view: KVStoreView[T], + max: Int) + (mapFunc: T => B) + (filterFunc: B => Boolean): Seq[B] = { + Utils.tryWithResource(view.closeableIterator()) { iter => + iter.asScala.map(mapFunc).filter(filterFunc).take(max).toList + } + } + def size[T](view: KVStoreView[T]): Int = { Utils.tryWithResource(view.closeableIterator()) { iter => iter.asScala.size diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index 66fac8a9d105..61e800844db0 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -82,6 +82,10 @@ private[spark] trait UIRoot { def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T def getApplicationInfoList: Iterator[ApplicationInfo] + + def getApplicationInfoList(max: Int)( + filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] + def getApplicationInfo(appId: String): Option[ApplicationInfo] /** diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 2d5dd97b501f..aaaa08b3340b 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -38,7 +38,7 @@ private[v1] class ApplicationListResource extends ApiRequestContext { val includeCompleted = status.isEmpty || status.contains(ApplicationStatus.COMPLETED) val includeRunning = status.isEmpty || status.contains(ApplicationStatus.RUNNING) - uiRoot.getApplicationInfoList.filter { app => + uiRoot.getApplicationInfoList(numApps) { app => val anyRunning = app.attempts.isEmpty || !app.attempts.head.completed // if any attempt is still running, we consider the app to also still be running; // keep the app if *any* attempts fall in the right time window @@ -46,7 +46,7 @@ private[v1] class ApplicationListResource extends ApiRequestContext { app.attempts.exists { attempt => isAttemptInRange(attempt, minDate, maxDate, minEndDate, maxEndDate, anyRunning) } - }.take(numApps) + } } private def isAttemptInRange( diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index b8d422c9d9fb..c6a0b6ae27ce 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -201,6 +201,11 @@ private[spark] class SparkUI private ( )) } + override def getApplicationInfoList(max: Int)( + filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = { + getApplicationInfoList.filter(filter).take(max) + } + def getApplicationInfo(appId: String): Option[ApplicationInfo] = { getApplicationInfoList.find(_.id == appId) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org