Repository: spark
Updated Branches:
  refs/heads/master 472db58cb -> c7f38e5ad


http://git-wip-us.apache.org/repos/asf/spark/blob/c7f38e5a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 6e94073..ee645f6 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
+import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.api.v1.{ApiRootResource, 
ApplicationAttemptInfo, ApplicationInfo,
   UIRoot}
 import org.apache.spark.storage.StorageStatusListener
@@ -39,6 +40,7 @@ import org.apache.spark.util.Utils
  * Top level user interface for a Spark application.
  */
 private[spark] class SparkUI private (
+    val store: AppStatusStore,
     val sc: Option[SparkContext],
     val conf: SparkConf,
     securityManager: SecurityManager,
@@ -51,7 +53,8 @@ private[spark] class SparkUI private (
     var appName: String,
     val basePath: String,
     val lastUpdateTime: Option[Long] = None,
-    val startTime: Long)
+    val startTime: Long,
+    val appSparkVersion: String)
   extends WebUI(securityManager, securityManager.getSSLOptions("ui"), 
SparkUI.getUIPort(conf),
     conf, basePath, "SparkUI")
   with Logging
@@ -61,8 +64,6 @@ private[spark] class SparkUI private (
 
   var appId: String = _
 
-  var appSparkVersion = org.apache.spark.SPARK_VERSION
-
   private var streamingJobProgressListener: Option[SparkListener] = None
 
   /** Initialize all components of the server. */
@@ -104,8 +105,12 @@ private[spark] class SparkUI private (
     logInfo(s"Stopped Spark web UI at $webUrl")
   }
 
-  def getSparkUI(appId: String): Option[SparkUI] = {
-    if (appId == this.appId) Some(this) else None
+  override def withSparkUI[T](appId: String, attemptId: Option[String])(fn: 
SparkUI => T): T = {
+    if (appId == this.appId) {
+      fn(this)
+    } else {
+      throw new NoSuchElementException()
+    }
   }
 
   def getApplicationInfoList: Iterator[ApplicationInfo] = {
@@ -159,63 +164,26 @@ private[spark] object SparkUI {
     conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
   }
 
-  def createLiveUI(
-      sc: SparkContext,
-      conf: SparkConf,
-      jobProgressListener: JobProgressListener,
-      securityManager: SecurityManager,
-      appName: String,
-      startTime: Long): SparkUI = {
-    create(Some(sc), conf,
-      sc.listenerBus.addToStatusQueue,
-      securityManager, appName, jobProgressListener = 
Some(jobProgressListener),
-      startTime = startTime)
-  }
-
-  def createHistoryUI(
-      conf: SparkConf,
-      listenerBus: SparkListenerBus,
-      securityManager: SecurityManager,
-      appName: String,
-      basePath: String,
-      lastUpdateTime: Option[Long],
-      startTime: Long): SparkUI = {
-    val sparkUI = create(None, conf, listenerBus.addListener, securityManager, 
appName, basePath,
-      lastUpdateTime = lastUpdateTime, startTime = startTime)
-
-    val listenerFactories = 
ServiceLoader.load(classOf[SparkHistoryListenerFactory],
-      Utils.getContextOrSparkClassLoader).asScala
-    listenerFactories.foreach { listenerFactory =>
-      val listeners = listenerFactory.createListeners(conf, sparkUI)
-      listeners.foreach(listenerBus.addListener)
-    }
-    sparkUI
-  }
-
   /**
-   * Create a new Spark UI.
-   *
-   * @param sc optional SparkContext; this can be None when reconstituting a 
UI from event logs.
-   * @param jobProgressListener if supplied, this JobProgressListener will be 
used; otherwise, the
-   *                            web UI will create and register its own 
JobProgressListener.
+   * Create a new UI backed by an AppStatusStore.
    */
-  private def create(
+  def create(
       sc: Option[SparkContext],
+      store: AppStatusStore,
       conf: SparkConf,
       addListenerFn: SparkListenerInterface => Unit,
       securityManager: SecurityManager,
       appName: String,
-      basePath: String = "",
-      jobProgressListener: Option[JobProgressListener] = None,
+      basePath: String,
+      startTime: Long,
       lastUpdateTime: Option[Long] = None,
-      startTime: Long): SparkUI = {
+      appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = {
 
-    val _jobProgressListener: JobProgressListener = 
jobProgressListener.getOrElse {
+    val jobProgressListener = sc.map(_.jobProgressListener).getOrElse {
       val listener = new JobProgressListener(conf)
       addListenerFn(listener)
       listener
     }
-
     val environmentListener = new EnvironmentListener
     val storageStatusListener = new StorageStatusListener(conf)
     val executorsListener = new ExecutorsListener(storageStatusListener, conf)
@@ -228,8 +196,9 @@ private[spark] object SparkUI {
     addListenerFn(storageListener)
     addListenerFn(operationGraphListener)
 
-    new SparkUI(sc, conf, securityManager, environmentListener, 
storageStatusListener,
-      executorsListener, _jobProgressListener, storageListener, 
operationGraphListener,
-      appName, basePath, lastUpdateTime, startTime)
+    new SparkUI(store, sc, conf, securityManager, environmentListener, 
storageStatusListener,
+      executorsListener, jobProgressListener, storageListener, 
operationGraphListener,
+      appName, basePath, lastUpdateTime, startTime, appSparkVersion)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c7f38e5a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
index 6e50e84..44f9c56 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -18,15 +18,11 @@
 package org.apache.spark.deploy.history
 
 import java.util.{Date, NoSuchElementException}
-import javax.servlet.Filter
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
 import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
 
 import com.codahale.metrics.Counter
-import com.google.common.cache.LoadingCache
-import com.google.common.util.concurrent.UncheckedExecutionException
 import org.eclipse.jetty.servlet.ServletContextHandler
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -39,24 +35,11 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, 
ApplicationInfo}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.util.ManualClock
 
 class ApplicationCacheSuite extends SparkFunSuite with Logging with 
MockitoSugar with Matchers {
 
   /**
-   * subclass with access to the cache internals
-   * @param retainedApplications number of retained applications
-   */
-  class TestApplicationCache(
-      operations: ApplicationCacheOperations = new StubCacheOperations(),
-      retainedApplications: Int,
-      clock: Clock = new ManualClock(0))
-      extends ApplicationCache(operations, retainedApplications, clock) {
-
-    def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
-  }
-
-  /**
    * Stub cache operations.
    * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
    * the `probeTime` field in the cache entry setting the timestamp of the 
entry
@@ -77,8 +60,7 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
     override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
       logDebug(s"getAppUI($appId, $attemptId)")
       getAppUICount += 1
-      instances.get(CacheKey(appId, attemptId)).map( e =>
-        LoadedAppUI(e.ui, () => updateProbe(appId, attemptId, e.probeTime)))
+      instances.get(CacheKey(appId, attemptId)).map { e => e.loadedUI }
     }
 
     override def attachSparkUI(
@@ -96,10 +78,9 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
         attemptId: Option[String],
         completed: Boolean,
         started: Long,
-        ended: Long,
-        timestamp: Long): SparkUI = {
-      val ui = putAppUI(appId, attemptId, completed, started, ended, timestamp)
-      attachSparkUI(appId, attemptId, ui, completed)
+        ended: Long): LoadedAppUI = {
+      val ui = putAppUI(appId, attemptId, completed, started, ended)
+      attachSparkUI(appId, attemptId, ui.ui, completed)
       ui
     }
 
@@ -108,23 +89,12 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
         attemptId: Option[String],
         completed: Boolean,
         started: Long,
-        ended: Long,
-        timestamp: Long): SparkUI = {
-      val ui = newUI(appId, attemptId, completed, started, ended)
-      putInstance(appId, attemptId, ui, completed, timestamp)
+        ended: Long): LoadedAppUI = {
+      val ui = LoadedAppUI(newUI(appId, attemptId, completed, started, ended))
+      instances(CacheKey(appId, attemptId)) = new CacheEntry(ui, completed)
       ui
     }
 
-    def putInstance(
-        appId: String,
-        attemptId: Option[String],
-        ui: SparkUI,
-        completed: Boolean,
-        timestamp: Long): Unit = {
-      instances += (CacheKey(appId, attemptId) ->
-          new CacheEntry(ui, completed, () => updateProbe(appId, attemptId, 
timestamp), timestamp))
-    }
-
     /**
      * Detach a reconstructed UI
      *
@@ -146,23 +116,6 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
       attached.get(CacheKey(appId, attemptId))
     }
 
-    /**
-     * The update probe.
-     * @param appId application to probe
-     * @param attemptId attempt to probe
-     * @param updateTime timestamp of this UI load
-     */
-    private[history] def updateProbe(
-        appId: String,
-        attemptId: Option[String],
-        updateTime: Long)(): Boolean = {
-      updateProbeCount += 1
-      logDebug(s"isUpdated($appId, $attemptId, ${updateTime})")
-      val entry = instances.get(CacheKey(appId, attemptId)).get
-      val updated = entry.probeTime > updateTime
-      logDebug(s"entry = $entry; updated = $updated")
-      updated
-    }
   }
 
   /**
@@ -210,15 +163,13 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
 
     val now = clock.getTimeMillis()
     // add the entry
-    operations.putAppUI(app1, None, true, now, now, now)
+    operations.putAppUI(app1, None, true, now, now)
 
     // make sure its local
     operations.getAppUI(app1, None).get
     operations.getAppUICount = 0
     // now expect it to be found
-    val cacheEntry = cache.lookupCacheEntry(app1, None)
-    assert(1 === cacheEntry.probeTime)
-    assert(cacheEntry.completed)
+    cache.withSparkUI(app1, None) { _ => }
     // assert about queries made of the operations
     assert(1 === operations.getAppUICount, "getAppUICount")
     assert(1 === operations.attachCount, "attachCount")
@@ -236,8 +187,8 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
     assert(0 === operations.detachCount, "attachCount")
 
     // evict the entry
-    operations.putAndAttach("2", None, true, time2, time2, time2)
-    operations.putAndAttach("3", None, true, time2, time2, time2)
+    operations.putAndAttach("2", None, true, time2, time2)
+    operations.putAndAttach("3", None, true, time2, time2)
     cache.get("2")
     cache.get("3")
 
@@ -248,7 +199,7 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
     val appId = "app1"
     val attemptId = Some("_01")
     val time3 = clock.getTimeMillis()
-    operations.putAppUI(appId, attemptId, false, time3, 0, time3)
+    operations.putAppUI(appId, attemptId, false, time3, 0)
     // expect an error here
     assertNotFound(appId, None)
   }
@@ -256,10 +207,11 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
   test("Test that if an attempt ID is set, it must be used in lookups") {
     val operations = new StubCacheOperations()
     val clock = new ManualClock(1)
-    implicit val cache = new ApplicationCache(operations, retainedApplications 
= 10, clock = clock)
+    implicit val cache = new ApplicationCache(operations, retainedApplications 
= 10,
+      clock = clock)
     val appId = "app1"
     val attemptId = Some("_01")
-    operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0, 0)
+    operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0)
     assertNotFound(appId, None)
   }
 
@@ -271,50 +223,29 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
   test("Incomplete apps refreshed") {
     val operations = new StubCacheOperations()
     val clock = new ManualClock(50)
-    val window = 500
-    implicit val cache = new ApplicationCache(operations, retainedApplications 
= 5, clock = clock)
+    implicit val cache = new ApplicationCache(operations, 5, clock)
     val metrics = cache.metrics
     // add the incomplete app
     // add the entry
     val started = clock.getTimeMillis()
     val appId = "app1"
     val attemptId = Some("001")
-    operations.putAppUI(appId, attemptId, false, started, 0, started)
-    val firstEntry = cache.lookupCacheEntry(appId, attemptId)
-    assert(started === firstEntry.probeTime, s"timestamp in $firstEntry")
-    assert(!firstEntry.completed, s"entry is complete: $firstEntry")
-    assertMetric("lookupCount", metrics.lookupCount, 1)
+    val initialUI = operations.putAndAttach(appId, attemptId, false, started, 
0)
 
+    val firstUI = cache.withSparkUI(appId, attemptId) { ui => ui }
+    assertMetric("lookupCount", metrics.lookupCount, 1)
     assert(0 === operations.updateProbeCount, "expected no update probe on 
that first get")
 
-    val checkTime = window * 2
-    clock.setTime(checkTime)
-    val entry3 = cache.lookupCacheEntry(appId, attemptId)
-    assert(firstEntry !== entry3, s"updated entry test from $cache")
+    // Invalidate the first entry to trigger a re-load.
+    initialUI.invalidate()
+
+    // Update the UI in the stub so that a new one is provided to the cache.
+    operations.putAppUI(appId, attemptId, true, started, started + 10)
+
+    val updatedUI = cache.withSparkUI(appId, attemptId) { ui => ui }
+    assert(firstUI !== updatedUI, s"expected updated UI")
     assertMetric("lookupCount", metrics.lookupCount, 2)
-    assertMetric("updateProbeCount", metrics.updateProbeCount, 1)
-    assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0)
-    assert(1 === operations.updateProbeCount, s"refresh count in $cache")
-    assert(0 === operations.detachCount, s"detach count")
-    assert(entry3.probeTime === checkTime)
-
-    val updateTime = window * 3
-    // update the cached value
-    val updatedApp = operations.putAppUI(appId, attemptId, true, started, 
updateTime, updateTime)
-    val endTime = window * 10
-    clock.setTime(endTime)
-    logDebug(s"Before operation = $cache")
-    val entry5 = cache.lookupCacheEntry(appId, attemptId)
-    assertMetric("lookupCount", metrics.lookupCount, 3)
-    assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
-    // the update was triggered
-    assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1)
-    assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match entry 
{$entry5} in $cache")
-
-    // at which point, the refreshes stop
-    clock.setTime(window * 20)
-    assertCacheEntryEquals(appId, attemptId, entry5)
-    assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
+    assert(1 === operations.detachCount, s"detach count")
   }
 
   /**
@@ -338,27 +269,6 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
   }
 
   /**
-   * Look up the cache entry and assert that it matches in the expected value.
-   * This assertion works if the two CacheEntries are different -it looks at 
the fields.
-   * UI are compared on object equality; the timestamp and completed flags 
directly.
-   * @param appId application ID
-   * @param attemptId attempt ID
-   * @param expected expected value
-   * @param cache app cache
-   */
-  def assertCacheEntryEquals(
-      appId: String,
-      attemptId: Option[String],
-      expected: CacheEntry)
-      (implicit cache: ApplicationCache): Unit = {
-    val actual = cache.lookupCacheEntry(appId, attemptId)
-    val errorText = s"Expected get($appId, $attemptId) -> $expected, but got 
$actual from $cache"
-    assert(expected.ui === actual.ui, errorText + " SparkUI reference")
-    assert(expected.completed === actual.completed, errorText + " -completed 
flag")
-    assert(expected.probeTime === actual.probeTime, errorText + " -timestamp")
-  }
-
-  /**
    * Assert that a key wasn't found in cache or loaded.
    *
    * Looks for the specific nested exception raised by [[ApplicationCache]]
@@ -370,14 +280,9 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
       appId: String,
       attemptId: Option[String])
       (implicit cache: ApplicationCache): Unit = {
-    val ex = intercept[UncheckedExecutionException] {
+    val ex = intercept[NoSuchElementException] {
       cache.get(appId, attemptId)
     }
-    var cause = ex.getCause
-    assert(cause !== null)
-    if (!cause.isInstanceOf[NoSuchElementException]) {
-      throw cause
-    }
   }
 
   test("Large Scale Application Eviction") {
@@ -385,12 +290,12 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
     val clock = new ManualClock(0)
     val size = 5
     // only two entries are retained, so we expect evictions to occur on 
lookups
-    implicit val cache: ApplicationCache = new TestApplicationCache(operations,
-      retainedApplications = size, clock = clock)
+    implicit val cache = new ApplicationCache(operations, retainedApplications 
= size,
+      clock = clock)
 
     val attempt1 = Some("01")
 
-    val ids = new ListBuffer[String]()
+    val ids = new mutable.ListBuffer[String]()
     // build a list of applications
     val count = 100
     for (i <- 1 to count ) {
@@ -398,7 +303,7 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
       ids += appId
       clock.advance(10)
       val t = clock.getTimeMillis()
-      operations.putAppUI(appId, attempt1, true, t, t, t)
+      operations.putAppUI(appId, attempt1, true, t, t)
     }
     // now go through them in sequence reading them, expect evictions
     ids.foreach { id =>
@@ -413,20 +318,19 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
 
   test("Attempts are Evicted") {
     val operations = new StubCacheOperations()
-    implicit val cache: ApplicationCache = new TestApplicationCache(operations,
-      retainedApplications = 4)
+    implicit val cache = new ApplicationCache(operations, 4, new ManualClock())
     val metrics = cache.metrics
     val appId = "app1"
     val attempt1 = Some("01")
     val attempt2 = Some("02")
     val attempt3 = Some("03")
-    operations.putAppUI(appId, attempt1, true, 100, 110, 110)
-    operations.putAppUI(appId, attempt2, true, 200, 210, 210)
-    operations.putAppUI(appId, attempt3, true, 300, 310, 310)
+    operations.putAppUI(appId, attempt1, true, 100, 110)
+    operations.putAppUI(appId, attempt2, true, 200, 210)
+    operations.putAppUI(appId, attempt3, true, 300, 310)
     val attempt4 = Some("04")
-    operations.putAppUI(appId, attempt4, true, 400, 410, 410)
+    operations.putAppUI(appId, attempt4, true, 400, 410)
     val attempt5 = Some("05")
-    operations.putAppUI(appId, attempt5, true, 500, 510, 510)
+    operations.putAppUI(appId, attempt5, true, 500, 510)
 
     def expectLoadAndEvictionCounts(expectedLoad: Int, expectedEvictionCount: 
Int): Unit = {
       assertMetric("loadCount", metrics.loadCount, expectedLoad)
@@ -457,20 +361,14 @@ class ApplicationCacheSuite extends SparkFunSuite with 
Logging with MockitoSugar
 
   }
 
-  test("Instantiate Filter") {
-    // this is a regression test on the filter being constructable
-    val clazz = 
Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME)
-    val instance = clazz.newInstance()
-    instance shouldBe a [Filter]
-  }
-
   test("redirect includes query params") {
-    val clazz = 
Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME)
-    val filter = clazz.newInstance().asInstanceOf[ApplicationCacheCheckFilter]
-    filter.appId = "local-123"
+    val operations = new StubCacheOperations()
+    val ui = operations.putAndAttach("foo", None, true, 0, 10)
     val cache = mock[ApplicationCache]
-    when(cache.checkForUpdates(any(), any())).thenReturn(true)
-    ApplicationCacheCheckFilterRelay.setApplicationCache(cache)
+    when(cache.operations).thenReturn(operations)
+    val filter = new ApplicationCacheCheckFilter(new CacheKey("foo", None), 
ui, cache)
+    ui.invalidate()
+
     val request = mock[HttpServletRequest]
     when(request.getMethod()).thenReturn("GET")
     
when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/";)

http://git-wip-us.apache.org/repos/asf/spark/blob/c7f38e5a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 03bd3ea..86c8cdf 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -41,6 +41,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.io._
 import org.apache.spark.scheduler._
 import org.apache.spark.security.GroupMappingServiceProvider
+import org.apache.spark.status.AppStatusStore
 import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
 
 class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with 
Matchers with Logging {
@@ -612,7 +613,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     // Manually overwrite the version in the listing db; this should cause the 
new provider to
     // discard all data because the versions don't match.
     val meta = new 
FsHistoryProviderMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 1,
-      conf.get(LOCAL_STORE_DIR).get)
+      AppStatusStore.CURRENT_VERSION, conf.get(LOCAL_STORE_DIR).get)
     oldProvider.listing.setMetadata(meta)
     oldProvider.stop()
 
@@ -620,6 +621,43 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     
assert(mistatchedVersionProvider.listing.count(classOf[ApplicationInfoWrapper]) 
=== 0)
   }
 
+  test("invalidate cached UI") {
+    val provider = new FsHistoryProvider(createTestConf())
+    val appId = "new1"
+
+    // Write an incomplete app log.
+    val appLog = newLogFile(appId, None, inProgress = true)
+    writeFile(appLog, true, None,
+      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None)
+      )
+    provider.checkForLogs()
+
+    // Load the app UI.
+    val oldUI = provider.getAppUI(appId, None)
+    assert(oldUI.isDefined)
+    intercept[NoSuchElementException] {
+      oldUI.get.ui.store.job(0)
+    }
+
+    // Add more info to the app log, and trigger the provider to update things.
+    writeFile(appLog, true, None,
+      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None),
+      SparkListenerJobStart(0, 1L, Nil, null),
+      SparkListenerApplicationEnd(5L)
+      )
+    provider.checkForLogs()
+
+    // Manually detach the old UI; ApplicationCache would do this 
automatically in a real SHS
+    // when the app's UI was requested.
+    provider.onUIDetached(appId, None, oldUI.get.ui)
+
+    // Load the UI again and make sure we can get the new info added to the 
logs.
+    val freshUI = provider.getAppUI(appId, None)
+    assert(freshUI.isDefined)
+    assert(freshUI != oldUI)
+    freshUI.get.ui.store.job(0)
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform 
checks on the updated
    * app list. Example:

http://git-wip-us.apache.org/repos/asf/spark/blob/c7f38e5a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index c11543a..010a8dd 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -72,6 +72,8 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
   private var port: Int = -1
 
   def init(extraConf: (String, String)*): Unit = {
+    Utils.deleteRecursively(storeDir)
+    assert(storeDir.mkdir())
     val conf = new SparkConf()
       .set("spark.history.fs.logDirectory", logDir)
       .set("spark.history.fs.update.interval", "0")
@@ -292,21 +294,8 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
     val uiRoot = "/testwebproxybase"
     System.setProperty("spark.ui.proxyBase", uiRoot)
 
-    server.stop()
-
-    val conf = new SparkConf()
-      .set("spark.history.fs.logDirectory", logDir)
-      .set("spark.history.fs.update.interval", "0")
-      .set("spark.testing", "true")
-      .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
-
-    provider = new FsHistoryProvider(conf)
-    provider.checkForLogs()
-    val securityManager = HistoryServer.createSecurityManager(conf)
-
-    server = new HistoryServer(conf, provider, securityManager, 18080)
-    server.initialize()
-    server.bind()
+    stop()
+    init()
 
     val port = server.boundPort
 
@@ -375,8 +364,6 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
   }
 
   test("incomplete apps get refreshed") {
-    server.stop()
-
     implicit val webDriver: WebDriver = new HtmlUnitDriver
     implicit val formats = org.json4s.DefaultFormats
 
@@ -386,6 +373,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 
     // a new conf is used with the background thread set and running at its 
fastest
     // allowed refresh rate (1Hz)
+    stop()
     val myConf = new SparkConf()
       .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
       .set("spark.eventLog.dir", logDir.getAbsolutePath)
@@ -418,7 +406,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
       }
     }
 
-    server = new HistoryServer(myConf, provider, securityManager, 18080)
+    server = new HistoryServer(myConf, provider, securityManager, 0)
     server.initialize()
     server.bind()
     val port = server.boundPort
@@ -464,7 +452,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
     rootAppPage should not be empty
 
     def getAppUI: SparkUI = {
-      provider.getAppUI(appId, None).get.ui
+      server.withSparkUI(appId, None) { ui => ui }
     }
 
     // selenium isn't that useful on failures...add our own reporting
@@ -519,7 +507,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
     getNumJobs("") should be (1)
     getNumJobs("/jobs") should be (1)
     getNumJobsRestful() should be (1)
-    assert(metrics.lookupCount.getCount > 1, s"lookup count too low in 
$metrics")
+    assert(metrics.lookupCount.getCount > 0, s"lookup count too low in 
$metrics")
 
     // dump state before the next bit of test, which is where update
     // checking really gets stressed

http://git-wip-us.apache.org/repos/asf/spark/blob/c7f38e5a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 6f7a0c1..7ac1ce1 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.status
 
 import java.io.File
-import java.util.{Date, Properties}
+import java.lang.{Integer => JInteger, Long => JLong}
+import java.util.{Arrays, Date, Properties}
 
 import scala.collection.JavaConverters._
 import scala.reflect.{classTag, ClassTag}
@@ -36,6 +37,10 @@ import org.apache.spark.util.kvstore._
 
 class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
 
+  import config._
+
+  private val conf = new SparkConf().set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
+
   private var time: Long = _
   private var testDir: File = _
   private var store: KVStore = _
@@ -52,7 +57,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
   }
 
   test("scheduler events") {
-    val listener = new AppStatusListener(store)
+    val listener = new AppStatusListener(store, conf, true)
 
     // Start the application.
     time += 1
@@ -174,6 +179,14 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     s1Tasks.foreach { task =>
       check[TaskDataWrapper](task.taskId) { wrapper =>
         assert(wrapper.info.taskId === task.taskId)
+        assert(wrapper.stageId === stages.head.stageId)
+        assert(wrapper.stageAttemptId === stages.head.attemptId)
+        assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, 
stages.head.attemptId)))
+
+        val runtime = Array[AnyRef](stages.head.stageId: JInteger, 
stages.head.attemptId: JInteger,
+          -1L: JLong)
+        assert(Arrays.equals(wrapper.runtime, runtime))
+
         assert(wrapper.info.index === task.index)
         assert(wrapper.info.attempt === task.attemptNumber)
         assert(wrapper.info.launchTime === new Date(task.launchTime))
@@ -510,7 +523,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
   }
 
   test("storage events") {
-    val listener = new AppStatusListener(store)
+    val listener = new AppStatusListener(store, conf, true)
     val maxMemory = 42L
 
     // Register a couple of block managers.

http://git-wip-us.apache.org/repos/asf/spark/blob/c7f38e5a/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 45b8870..99cac34 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -38,6 +38,8 @@ object MimaExcludes {
   lazy val v23excludes = v22excludes ++ Seq(
     // SPARK-18085: Better History Server scalability for many / large 
applications
     
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"),
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.getSparkUI"),
+
     // [SPARK-20495][SQL] Add StorageLevel to cacheTable API
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to