This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9caa378fe3e1 [SPARK-43403][UI] Ensure old SparkUI in HistoryServer has 
been detached before loading new one
9caa378fe3e1 is described below

commit 9caa378fe3e1eb236bceda36028ec5ee6ed677b1
Author: zhouyifan279 <[email protected]>
AuthorDate: Wed Jan 24 10:18:05 2024 +0800

    [SPARK-43403][UI] Ensure old SparkUI in HistoryServer has been detached 
before loading new one
    
    ### What changes were proposed in this pull request?
    Ensure old SparkUI in HistoryServer has been detached before loading new 
one.
    
    ### Why are the changes needed?
    The error described in SPARK-43403 happened quite often when user opened 
SparkUIs of in-progress Apps frequently.
    
    It happened when `FsHistoryProvider#onUIDetached` run (in thread-a) right 
after a new SparkUI with the same appId and attemptId was loaded (in thread-b).
    
    To avoid this, we let loading of the new SparkUI wait until old SparkUI is 
detached.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add test `Load new SparkUI during old one is detaching` in 
`ApplicationCacheSuite`
    
    Closes #44851 from zhouyifan279/db_is_closed.
    
    Lead-authored-by: zhouyifan279 <[email protected]>
    Co-authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../spark/deploy/history/ApplicationCache.scala    | 21 ++++-
 .../deploy/history/ApplicationCacheSuite.scala     | 92 +++++++++++++++++++++-
 2 files changed, 108 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
index 9d2531016775..96313c58ad28 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.history
 
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, 
ExecutionException}
 import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, 
ServletException, ServletRequest, ServletResponse}
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
@@ -48,11 +48,24 @@ private[history] class ApplicationCache(
     val retainedApplications: Int,
     val clock: Clock) extends Logging {
 
+  /**
+   * Keep track of SparkUIs in [[ApplicationCache#appCache]] and SparkUIs 
removed from
+   * [[ApplicationCache#appCache]] but not detached yet.
+   */
+  private val loadedApps = new ConcurrentHashMap[CacheKey, CountDownLatch]()
+
   private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
 
     /** the cache key doesn't match a cached entry, or the entry is 
out-of-date, so load it. */
     override def load(key: CacheKey): CacheEntry = {
-      loadApplicationEntry(key.appId, key.attemptId)
+      // Ensure old SparkUI has been detached before loading new one.
+      val removalLatch = loadedApps.get(key)
+      if (removalLatch != null) {
+        removalLatch.await()
+      }
+      val entry = loadApplicationEntry(key.appId, key.attemptId)
+      loadedApps.put(key, new CountDownLatch(1))
+      entry
     }
 
   }
@@ -63,11 +76,13 @@ private[history] class ApplicationCache(
      * Removal event notifies the provider to detach the UI.
      * @param rm removal notification
      */
-    override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = {
+    override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): 
Unit = try {
       metrics.evictionCount.inc()
       val key = rm.getKey
       logDebug(s"Evicting entry ${key}")
       operations.detachSparkUI(key.appId, key.attemptId, 
rm.getValue().loadedUI.ui)
+    } finally {
+      loadedApps.remove(rm.getKey()).countDown()
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
index 25d668ad75cc..1d5ec57a55b5 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -18,11 +18,13 @@
 package org.apache.spark.deploy.history
 
 import java.util.{Date, NoSuchElementException}
+import java.util.concurrent.{CountDownLatch, Executors, TimeoutException, 
TimeUnit}
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
 import scala.collection.mutable
 
 import com.codahale.metrics.Counter
+import org.apache.hadoop.conf.Configuration
 import org.eclipse.jetty.servlet.ServletContextHandler
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito._
@@ -31,11 +33,13 @@ import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 import org.scalatestplus.mockito.MockitoSugar
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History.{HISTORY_LOG_DIR, 
LOCAL_STORE_DIR}
+import org.apache.spark.scheduler.SparkListenerApplicationStart
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, 
ApplicationInfo}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{ManualClock, Utils}
 
 class ApplicationCacheSuite extends SparkFunSuite with MockitoSugar with 
Matchers {
 
@@ -381,4 +385,88 @@ class ApplicationCacheSuite extends SparkFunSuite with 
MockitoSugar with Matcher
     
verify(resp).sendRedirect("http://localhost:18080/history/local-123/jobs/job/?id=2";)
   }
 
+  test("SPARK-43403: Load new SparkUI during old one is detaching") {
+    val historyLogDir = Utils.createTempDir()
+    val sparkConf = new SparkConf()
+      .set(HISTORY_LOG_DIR, historyLogDir.getAbsolutePath())
+      .set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath())
+
+    val detachStarted = new CountDownLatch(1)
+    val detachFinished = new CountDownLatch(1)
+    val blockBeforeDetach = new CountDownLatch(1)
+    val blockAfterDetach = new CountDownLatch(1)
+    val fsHistoryProvider = new FsHistoryProvider(sparkConf)
+    val threadPool = Executors.newFixedThreadPool(2)
+
+    val operations = new ApplicationCacheOperations {
+
+      override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] =
+        fsHistoryProvider.getAppUI(appId, attemptId)
+
+      override def attachSparkUI(
+        appId: String,
+        attemptId: Option[String],
+        ui: SparkUI, completed: Boolean): Unit = {}
+
+      override def detachSparkUI(appId: String, attemptId: Option[String], ui: 
SparkUI): Unit = {
+        detachStarted.countDown()
+        blockBeforeDetach.await()
+        try {
+          fsHistoryProvider.onUIDetached(appId, attemptId, ui)
+        } finally {
+          detachFinished.countDown()
+          blockAfterDetach.await()
+        }
+      }
+    }
+    val cache = new ApplicationCache(operations, 2, new ManualClock())
+    val metrics = cache.metrics
+
+    val index = 1
+    val appId = s"app$index"
+    EventLogTestHelper.writeEventLogFile(sparkConf, new Configuration(), 
historyLogDir, index,
+      Seq(SparkListenerApplicationStart("SPARK-43403", Some(appId), 3L, 
"test", None)))
+    fsHistoryProvider.checkForLogs()
+
+    // Load first SparkUI
+    val ui = cache.get(appId)
+    val loadCount = metrics.loadCount.getCount
+
+    // Simulate situation: LoadedAppUI was invalidated because EventLog had 
changed
+    threadPool.execute(() => {
+      ui.loadedUI.invalidate()
+      ui.loadedUI.ui.store.close()
+      cache.invalidate(CacheKey(appId, None))
+    })
+
+    // Wait for first SparkUI to detach after being removed from cache
+    detachStarted.await()
+
+    // Expect TimeoutException because old SparkUI is not detached yet
+    var loadedUI: LoadedAppUI = null
+    var exception: Exception = null
+    try {
+      loadedUI = threadPool.submit(() => cache.get(appId).loadedUI)
+        .get(5, TimeUnit.SECONDS)
+    } catch {
+      case e: Exception =>
+        exception = e
+    }
+    // Unblock to start detaching
+    blockBeforeDetach.countDown()
+    // Wait for old SparkUI detached
+    detachFinished.await()
+    // Without this PR, "java.lang.IllegalStateException: DB is closed" would 
be
+    // thrown when reading from newly loaded SparkUI
+    if (loadedUI != null) {
+      loadedUI.ui.store.appSummary()
+    }
+    assert(exception != null && exception.isInstanceOf[TimeoutException])
+    assert(loadCount == metrics.loadCount.getCount)
+
+    // Unblock method `onUIDetached` so that new SparkUI can be loaded.
+    blockAfterDetach.countDown()
+    cache.get(appId)
+    assert(loadCount + 1 == metrics.loadCount.getCount)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to