This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9caa378fe3e1 [SPARK-43403][UI] Ensure old SparkUI in HistoryServer has
been detached before loading new one
9caa378fe3e1 is described below
commit 9caa378fe3e1eb236bceda36028ec5ee6ed677b1
Author: zhouyifan279 <[email protected]>
AuthorDate: Wed Jan 24 10:18:05 2024 +0800
[SPARK-43403][UI] Ensure old SparkUI in HistoryServer has been detached
before loading new one
### What changes were proposed in this pull request?
Ensure old SparkUI in HistoryServer has been detached before loading new
one.
### Why are the changes needed?
The error described in SPARK-43403 happened quite often when user opened
SparkUIs of in-progress Apps frequently.
It happened when `FsHistoryProvider#onUIDetached` run (in thread-a) right
after a new SparkUI with the same appId and attemptId was loaded (in thread-b).
To avoid this, we let loading of the new SparkUI wait until old SparkUI is
detached.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add test `Load new SparkUI during old one is detaching` in
`ApplicationCacheSuite`
Closes #44851 from zhouyifan279/db_is_closed.
Lead-authored-by: zhouyifan279 <[email protected]>
Co-authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../spark/deploy/history/ApplicationCache.scala | 21 ++++-
.../deploy/history/ApplicationCacheSuite.scala | 92 +++++++++++++++++++++-
2 files changed, 108 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
index 9d2531016775..96313c58ad28 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.history
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch,
ExecutionException}
import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig,
ServletException, ServletRequest, ServletResponse}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
@@ -48,11 +48,24 @@ private[history] class ApplicationCache(
val retainedApplications: Int,
val clock: Clock) extends Logging {
+ /**
+ * Keep track of SparkUIs in [[ApplicationCache#appCache]] and SparkUIs
removed from
+ * [[ApplicationCache#appCache]] but not detached yet.
+ */
+ private val loadedApps = new ConcurrentHashMap[CacheKey, CountDownLatch]()
+
private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
/** the cache key doesn't match a cached entry, or the entry is
out-of-date, so load it. */
override def load(key: CacheKey): CacheEntry = {
- loadApplicationEntry(key.appId, key.attemptId)
+ // Ensure old SparkUI has been detached before loading new one.
+ val removalLatch = loadedApps.get(key)
+ if (removalLatch != null) {
+ removalLatch.await()
+ }
+ val entry = loadApplicationEntry(key.appId, key.attemptId)
+ loadedApps.put(key, new CountDownLatch(1))
+ entry
}
}
@@ -63,11 +76,13 @@ private[history] class ApplicationCache(
* Removal event notifies the provider to detach the UI.
* @param rm removal notification
*/
- override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]):
Unit = {
+ override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]):
Unit = try {
metrics.evictionCount.inc()
val key = rm.getKey
logDebug(s"Evicting entry ${key}")
operations.detachSparkUI(key.appId, key.attemptId,
rm.getValue().loadedUI.ui)
+ } finally {
+ loadedApps.remove(rm.getKey()).countDown()
}
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
index 25d668ad75cc..1d5ec57a55b5 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -18,11 +18,13 @@
package org.apache.spark.deploy.history
import java.util.{Date, NoSuchElementException}
+import java.util.concurrent.{CountDownLatch, Executors, TimeoutException,
TimeUnit}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import scala.collection.mutable
import com.codahale.metrics.Counter
+import org.apache.hadoop.conf.Configuration
import org.eclipse.jetty.servlet.ServletContextHandler
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
@@ -31,11 +33,13 @@ import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
import org.scalatestplus.mockito.MockitoSugar
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History.{HISTORY_LOG_DIR,
LOCAL_STORE_DIR}
+import org.apache.spark.scheduler.SparkListenerApplicationStart
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo,
ApplicationInfo}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{ManualClock, Utils}
class ApplicationCacheSuite extends SparkFunSuite with MockitoSugar with
Matchers {
@@ -381,4 +385,88 @@ class ApplicationCacheSuite extends SparkFunSuite with
MockitoSugar with Matcher
verify(resp).sendRedirect("http://localhost:18080/history/local-123/jobs/job/?id=2")
}
+ test("SPARK-43403: Load new SparkUI during old one is detaching") {
+ val historyLogDir = Utils.createTempDir()
+ val sparkConf = new SparkConf()
+ .set(HISTORY_LOG_DIR, historyLogDir.getAbsolutePath())
+ .set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath())
+
+ val detachStarted = new CountDownLatch(1)
+ val detachFinished = new CountDownLatch(1)
+ val blockBeforeDetach = new CountDownLatch(1)
+ val blockAfterDetach = new CountDownLatch(1)
+ val fsHistoryProvider = new FsHistoryProvider(sparkConf)
+ val threadPool = Executors.newFixedThreadPool(2)
+
+ val operations = new ApplicationCacheOperations {
+
+ override def getAppUI(appId: String, attemptId: Option[String]):
Option[LoadedAppUI] =
+ fsHistoryProvider.getAppUI(appId, attemptId)
+
+ override def attachSparkUI(
+ appId: String,
+ attemptId: Option[String],
+ ui: SparkUI, completed: Boolean): Unit = {}
+
+ override def detachSparkUI(appId: String, attemptId: Option[String], ui:
SparkUI): Unit = {
+ detachStarted.countDown()
+ blockBeforeDetach.await()
+ try {
+ fsHistoryProvider.onUIDetached(appId, attemptId, ui)
+ } finally {
+ detachFinished.countDown()
+ blockAfterDetach.await()
+ }
+ }
+ }
+ val cache = new ApplicationCache(operations, 2, new ManualClock())
+ val metrics = cache.metrics
+
+ val index = 1
+ val appId = s"app$index"
+ EventLogTestHelper.writeEventLogFile(sparkConf, new Configuration(),
historyLogDir, index,
+ Seq(SparkListenerApplicationStart("SPARK-43403", Some(appId), 3L,
"test", None)))
+ fsHistoryProvider.checkForLogs()
+
+ // Load first SparkUI
+ val ui = cache.get(appId)
+ val loadCount = metrics.loadCount.getCount
+
+ // Simulate situation: LoadedAppUI was invalidated because EventLog had
changed
+ threadPool.execute(() => {
+ ui.loadedUI.invalidate()
+ ui.loadedUI.ui.store.close()
+ cache.invalidate(CacheKey(appId, None))
+ })
+
+ // Wait for first SparkUI to detach after being removed from cache
+ detachStarted.await()
+
+ // Expect TimeoutException because old SparkUI is not detached yet
+ var loadedUI: LoadedAppUI = null
+ var exception: Exception = null
+ try {
+ loadedUI = threadPool.submit(() => cache.get(appId).loadedUI)
+ .get(5, TimeUnit.SECONDS)
+ } catch {
+ case e: Exception =>
+ exception = e
+ }
+ // Unblock to start detaching
+ blockBeforeDetach.countDown()
+ // Wait for old SparkUI detached
+ detachFinished.await()
+ // Without this PR, "java.lang.IllegalStateException: DB is closed" would
be
+ // thrown when reading from newly loaded SparkUI
+ if (loadedUI != null) {
+ loadedUI.ui.store.appSummary()
+ }
+ assert(exception != null && exception.isInstanceOf[TimeoutException])
+ assert(loadCount == metrics.loadCount.getCount)
+
+ // Unblock method `onUIDetached` so that new SparkUI can be loaded.
+ blockAfterDetach.countDown()
+ cache.get(appId)
+ assert(loadCount + 1 == metrics.loadCount.getCount)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]