This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aeae9ff7bfbb [SPARK-52737][CORE] Pushdown predicate and number of apps 
to FsHistoryProvider when listing applications
aeae9ff7bfbb is described below

commit aeae9ff7bfbbef574c047dd4d25c1cdb8667da96
Author: Shardul Mahadik <smaha...@linkedin.com>
AuthorDate: Sat Jul 26 19:21:03 2025 +0800

    [SPARK-52737][CORE] Pushdown predicate and number of apps to 
FsHistoryProvider when listing applications
    
    ### What changes were proposed in this pull request?
    SPARK-38896 modified how applications are listed from the KVStore to close 
the KVStore iterator eagerly 
[Link](https://github.com/apache/spark/pull/36237/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R328).
 This meant that `FsHistoryProvider.getListing` now eagerly goes through every 
application in the KVStore before returning an iterator to the caller. In a 
couple of contexts where `FsHistoryProvider.getListing` is used, this is very 
detrimental. e.g. [he [...]
    
    To fix the issue, while preserving the original intent of closing the 
iterator early, this PR proposes pushing down filter predicates and number of 
applications required to FsHistoryProvider.
    
    ### Why are the changes needed?
    To fix a perf regression in SHS due to SPARK-38896
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests for `HistoryPage` and `ApplicationListResource`
    
    Tested performance on local SHS with a large number of apps (~75k) 
consistent with production.
    Before:
    ```
    smahadiklocalhost [ ~ ]$ curl http://localhost:18080/api/v1/applications | 
jq 'length'
    75061
    
    smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: 
%{time_total}s\n" -o /dev/null http://localhost:18080; done
    Total time: 3.607995s
    Total time: 3.564875s
    Total time: 3.095895s
    Total time: 3.153576s
    Total time: 3.157186s
    Total time: 3.251107s
    Total time: 3.681727s
    Total time: 4.622074s
    Total time: 6.866931s
    Total time: 3.523224s
    
    smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: 
%{time_total}s\n" -o /dev/null 
http://localhost:18080/api/v1/applications?limit=10; done
    Total time: 3.340698s
    Total time: 3.206455s
    Total time: 3.140326s
    Total time: 4.704944s
    Total time: 3.982831s
    Total time: 7.375094s
    Total time: 3.328329s
    Total time: 3.264700s
    Total time: 3.283851s
    Total time: 3.456416s
    ```
    
    After:
    ```
    smahadiklocalhost [ ~ ]$ curl http://localhost:18080/api/v1/applications | 
jq 'length'
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  
Current
                                     Dload  Upload   Total   Spent    Left  
Speed
    100 36.7M    0 36.7M    0     0  7662k      0 --:--:--  0:00:04 --:--:-- 
7663k
    75077
    
    smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: 
%{time_total}s\n" -o /dev/null http://localhost:18080; done
    Total time: 0.224714s
    Total time: 0.012205s
    Total time: 0.014709s
    Total time: 0.008092s
    Total time: 0.007284s
    Total time: 0.006350s
    Total time: 0.005414s
    Total time: 0.006391s
    Total time: 0.005668s
    Total time: 0.004738s
    
    smahadiklocalhost [ ~ ]$ for i in {1..10}; do curl -s -w "\nTotal time: 
%{time_total}s\n" -o /dev/null 
http://localhost:18080/api/v1/applications?limit=10; done
    Total time: 1.439507s
    Total time: 0.015126s
    Total time: 0.009085s
    Total time: 0.007620s
    Total time: 0.007692s
    Total time: 0.007420s
    Total time: 0.007152s
    Total time: 0.010515s
    Total time: 0.011493s
    Total time: 0.007564s
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #51428 from shardulm94/smahadik/shs-slow.
    
    Authored-by: Shardul Mahadik <smaha...@linkedin.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../spark/deploy/history/ApplicationHistoryProvider.scala  |  9 +++++++++
 .../apache/spark/deploy/history/FsHistoryProvider.scala    |  8 ++++++++
 .../org/apache/spark/deploy/history/HistoryPage.scala      |  2 +-
 .../org/apache/spark/deploy/history/HistoryServer.scala    |  5 +++++
 core/src/main/scala/org/apache/spark/status/KVUtils.scala  | 14 ++++++++++++++
 .../org/apache/spark/status/api/v1/ApiRootResource.scala   |  4 ++++
 .../spark/status/api/v1/ApplicationListResource.scala      |  4 ++--
 core/src/main/scala/org/apache/spark/ui/SparkUI.scala      |  5 +++++
 8 files changed, 48 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index f3f7db6bb0ab..89f0d12935ce 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -99,6 +99,15 @@ private[history] abstract class ApplicationHistoryProvider {
    */
   def getListing(): Iterator[ApplicationInfo]
 
+  /**
+   * Returns a list of applications available for the history server to show.
+   *
+   * @param max The maximum number of applications to return
+   * @param predicate A function that filters the applications to be returned
+   * @return An iterator of matching applications up to the specified maximum
+   */
+  def getListing(max: Int)(predicate: ApplicationInfo => Boolean): 
Iterator[ApplicationInfo]
+
   /**
    * Returns the Spark UI for a specific application.
    *
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 4aa6c7e40c54..4365e9228072 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -307,6 +307,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       .index("endTime").reverse())(_.toApplicationInfo()).iterator
   }
 
+  override def getListing(max: Int)(
+      predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
+    // Return the filtered listing in end time descending order.
+    KVUtils.mapToSeqWithFilter(
+      listing.view(classOf[ApplicationInfoWrapper]).index("endTime").reverse(),
+      max)(_.toApplicationInfo())(predicate).iterator
+  }
+
   override def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
     try {
       Some(load(appId).toApplicationInfo())
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index ff1629b69809..4eeddd7cc709 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -109,7 +109,7 @@ private[history] class HistoryPage(parent: HistoryServer) 
extends WebUIPage("")
   }
 
   def shouldDisplayApplications(requestedIncomplete: Boolean): Boolean = {
-    parent.getApplicationList().exists(isApplicationCompleted(_) != 
requestedIncomplete)
+    parent.getApplicationInfoList(1)(isApplicationCompleted(_) != 
requestedIncomplete).nonEmpty
   }
 
   private def makePageLink(request: HttpServletRequest, showIncomplete: 
Boolean): String = {
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index ce9f70c9a83e..aba66c319ca9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -224,6 +224,11 @@ class HistoryServer(
     getApplicationList()
   }
 
+  override def getApplicationInfoList(max: Int)(
+      filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
+    provider.getListing(max)(filter)
+  }
+
   def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
     provider.getApplicationInfo(appId)
   }
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala 
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index e334626413dc..49b77111abaf 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -213,6 +213,20 @@ private[spark] object KVUtils extends Logging {
     }
   }
 
+  /**
+   * Maps all values of KVStoreView to new values using a transformation 
function
+   * and filtered by a filter function.
+   */
+  def mapToSeqWithFilter[T, B](
+      view: KVStoreView[T],
+      max: Int)
+      (mapFunc: T => B)
+      (filterFunc: B => Boolean): Seq[B] = {
+    Utils.tryWithResource(view.closeableIterator()) { iter =>
+      iter.asScala.map(mapFunc).filter(filterFunc).take(max).toList
+    }
+  }
+
   def size[T](view: KVStoreView[T]): Int = {
     Utils.tryWithResource(view.closeableIterator()) { iter =>
       iter.asScala.size
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
index 66fac8a9d105..61e800844db0 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
@@ -82,6 +82,10 @@ private[spark] trait UIRoot {
   def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => 
T): T
 
   def getApplicationInfoList: Iterator[ApplicationInfo]
+
+  def getApplicationInfoList(max: Int)(
+      filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo]
+
   def getApplicationInfo(appId: String): Option[ApplicationInfo]
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
 
b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
index 2d5dd97b501f..aaaa08b3340b 100644
--- 
a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -38,7 +38,7 @@ private[v1] class ApplicationListResource extends 
ApiRequestContext {
     val includeCompleted = status.isEmpty || 
status.contains(ApplicationStatus.COMPLETED)
     val includeRunning = status.isEmpty || 
status.contains(ApplicationStatus.RUNNING)
 
-    uiRoot.getApplicationInfoList.filter { app =>
+    uiRoot.getApplicationInfoList(numApps) { app =>
       val anyRunning = app.attempts.isEmpty || !app.attempts.head.completed
       // if any attempt is still running, we consider the app to also still be 
running;
       // keep the app if *any* attempts fall in the right time window
@@ -46,7 +46,7 @@ private[v1] class ApplicationListResource extends 
ApiRequestContext {
       app.attempts.exists { attempt =>
         isAttemptInRange(attempt, minDate, maxDate, minEndDate, maxEndDate, 
anyRunning)
       }
-    }.take(numApps)
+    }
   }
 
   private def isAttemptInRange(
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index b8d422c9d9fb..c6a0b6ae27ce 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -201,6 +201,11 @@ private[spark] class SparkUI private (
     ))
   }
 
+  override def getApplicationInfoList(max: Int)(
+      filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
+    getApplicationInfoList.filter(filter).take(max)
+  }
+
   def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
     getApplicationInfoList.find(_.id == appId)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to