[SPARK-3454] separate json endpoints for data in the UI

Exposes data available in the UI as json over http.  Key points:

* new endpoints, handled independently of existing XyzPage classes.  Root 
entrypoint is `JsonRootResource`
* Uses jersey + jackson for routing & converting POJOs into json
* tests against known results in `HistoryServerSuite`
* also fixes some minor issues w/ the UI -- synchronizing on access to 
`StorageListener` & `StorageStatusListener`, and fixing some inconsistencies w/ 
the way we handle retained jobs & stages.

Author: Imran Rashid <[email protected]>

Closes #5940 from squito/SPARK-3454_better_test_files and squashes the 
following commits:

1a72ed6 [Imran Rashid] rats
85fdb3e [Imran Rashid] Merge branch 'no_php' into SPARK-3454
1fc65b0 [Imran Rashid] Revert "Revert "[SPARK-3454] separate json endpoints for 
data in the UI""
1276900 [Imran Rashid] get rid of giant event file, replace w/ smaller one; 
check both shuffle read & shuffle write
4e12013 [Imran Rashid] just use test case name for expectation file name
863ef64 [Imran Rashid] rename json files to avoid strange file names and not 
look like php

(cherry picked from commit c796be70f36e262b6a2ce75924bd970f40bf4045)
Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/532bfdad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/532bfdad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/532bfdad

Branch: refs/heads/branch-1.4
Commit: 532bfdad4a4a4130ee8c166aa52058d2bd0c6a03
Parents: acf4bc1
Author: Imran Rashid <[email protected]>
Authored: Fri May 8 16:54:32 2015 +0100
Committer: Patrick Wendell <[email protected]>
Committed: Fri May 8 16:54:46 2015 +0100

----------------------------------------------------------------------
 .rat-excludes                                   |    7 +
 core/pom.xml                                    |    8 +
 .../org/apache/spark/JobExecutionStatus.java    |    8 +-
 .../spark/status/api/v1/ApplicationStatus.java  |   30 +
 .../apache/spark/status/api/v1/StageStatus.java |   31 +
 .../apache/spark/status/api/v1/TaskSorting.java |   48 +
 .../java/org/apache/spark/util/EnumUtil.java    |   38 +
 .../scala/org/apache/spark/SparkContext.scala   |    2 +-
 .../history/ApplicationHistoryProvider.scala    |    4 +-
 .../deploy/history/FsHistoryProvider.scala      |   14 +-
 .../spark/deploy/history/HistoryServer.scala    |   20 +-
 .../spark/deploy/master/ApplicationInfo.scala   |    2 +-
 .../org/apache/spark/deploy/master/Master.scala |   14 +-
 .../deploy/master/ui/ApplicationPage.scala      |   19 +-
 .../spark/deploy/master/ui/MasterPage.scala     |   12 +-
 .../spark/deploy/master/ui/MasterWebUI.scala    |   24 +-
 .../spark/status/api/v1/AllJobsResource.scala   |   98 ++
 .../spark/status/api/v1/AllRDDResource.scala    |  104 ++
 .../spark/status/api/v1/AllStagesResource.scala |  309 ++++
 .../status/api/v1/ApplicationListResource.scala |   94 ++
 .../status/api/v1/ExecutorListResource.scala    |   36 +
 .../status/api/v1/JacksonMessageWriter.scala    |   93 ++
 .../spark/status/api/v1/JsonRootResource.scala  |  255 ++++
 .../status/api/v1/OneApplicationResource.scala  |   31 +
 .../spark/status/api/v1/OneJobResource.scala    |   41 +
 .../spark/status/api/v1/OneRDDResource.scala    |   34 +
 .../spark/status/api/v1/OneStageResource.scala  |  150 ++
 .../spark/status/api/v1/SecurityFilter.scala    |   38 +
 .../spark/status/api/v1/SimpleDateParam.scala   |   55 +
 .../org/apache/spark/status/api/v1/api.scala    |  228 +++
 .../spark/storage/StorageStatusListener.scala   |    6 +-
 .../scala/org/apache/spark/ui/SparkUI.scala     |   49 +-
 .../main/scala/org/apache/spark/ui/WebUI.scala  |    8 +-
 .../apache/spark/ui/exec/ExecutorsPage.scala    |   17 +-
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  |   14 +-
 .../apache/spark/ui/jobs/AllStagesPage.scala    |   12 +-
 .../org/apache/spark/ui/jobs/JobPage.scala      |    2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |    4 +
 .../org/apache/spark/ui/jobs/PoolPage.scala     |    2 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   19 +-
 .../org/apache/spark/ui/storage/RDDPage.scala   |   73 +-
 .../apache/spark/ui/storage/StoragePage.scala   |    2 +-
 .../apache/spark/ui/storage/StorageTab.scala    |    6 +-
 .../application_list_json_expectation.json      |   53 +
 .../complete_stage_list_json_expectation.json   |   67 +
 .../completed_app_list_json_expectation.json    |   53 +
 .../executor_list_json_expectation.json         |   17 +
 .../failed_stage_list_json_expectation.json     |   23 +
 ...m_multi_attempt_app_json_1__expectation.json |   15 +
 ...m_multi_attempt_app_json_2__expectation.json |   15 +
 .../job_list_json_expectation.json              |   43 +
 .../maxDate2_app_list_json_expectation.json     |   10 +
 .../maxDate_app_list_json_expectation.json      |   19 +
 .../minDate_app_list_json_expectation.json      |   35 +
 .../one_app_json_expectation.json               |   10 +
 .../one_app_multi_attempt_json_expectation.json |   17 +
 .../one_job_json_expectation.json               |   15 +
 .../one_rdd_storage_json_expectation.json       |   64 +
 .../one_stage_attempt_json_expectation.json     |  270 ++++
 .../one_stage_json_expectation.json             |  270 ++++
 .../rdd_list_storage_json_expectation.json      |    9 +
 .../running_app_list_json_expectation.json      |    1 +
 .../stage_list_json_expectation.json            |   89 ++
 ..._list_with_accumulable_json_expectation.json |   27 +
 .../stage_task_list_expectation.json            |  561 +++++++
 ...m_multi_attempt_app_json_1__expectation.json |  193 +++
 ...m_multi_attempt_app_json_2__expectation.json |  193 +++
 ...ask_list_w__offset___length_expectation.json | 1401 ++++++++++++++++++
 .../stage_task_list_w__sortBy_expectation.json  |  561 +++++++
 ...ortBy_short_names___runtime_expectation.json |  561 +++++++
 ...sortBy_short_names__runtime_expectation.json |  561 +++++++
 ...summary_w__custom_quantiles_expectation.json |   19 +
 ...task_summary_w_shuffle_read_expectation.json |   19 +
 ...ask_summary_w_shuffle_write_expectation.json |   19 +
 ...stage_with_accumulable_json_expectation.json |  242 +++
 ...ceeded_failed_job_list_json_expectation.json |   43 +
 .../succeeded_job_list_json_expectation.json    |   29 +
 .../local-1422981759269/APPLICATION_COMPLETE    |    0
 .../local-1422981759269/EVENT_LOG_1             |   88 ++
 .../local-1422981759269/SPARK_VERSION_1.2.0     |    0
 .../local-1422981780767/APPLICATION_COMPLETE    |    0
 .../local-1422981780767/EVENT_LOG_1             |   82 +
 .../local-1422981780767/SPARK_VERSION_1.2.0     |    0
 .../local-1425081759269/APPLICATION_COMPLETE    |    0
 .../local-1425081759269/EVENT_LOG_1             |   88 ++
 .../local-1425081759269/SPARK_VERSION_1.2.0     |    0
 .../local-1426533911241/APPLICATION_COMPLETE    |    0
 .../local-1426533911241/EVENT_LOG_1             |   24 +
 .../local-1426533911241/SPARK_VERSION_1.2.0     |    0
 .../local-1426633911242/APPLICATION_COMPLETE    |    0
 .../local-1426633911242/EVENT_LOG_1             |   24 +
 .../local-1426633911242/SPARK_VERSION_1.2.0     |    0
 .../resources/spark-events/local-1430917381534  |  231 +++
 .../scala/org/apache/spark/JsonTestUtils.scala  |   34 +
 .../apache/spark/deploy/JsonProtocolSuite.scala |   14 +-
 .../deploy/history/HistoryServerSuite.scala     |  231 ++-
 .../status/api/v1/SimpleDateParamTest.scala     |   29 +
 .../org/apache/spark/ui/UISeleniumSuite.scala   |  264 +++-
 docs/monitoring.md                              |   74 +
 pom.xml                                         |   12 +
 100 files changed, 8608 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index dccf2db..dc08c4a 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -74,5 +74,12 @@ logs
 .*scalastyle-output.xml
 .*dependency-reduced-pom.xml
 known_translations
+json_expectation
+local-1422981759269/*
+local-1422981780767/*
+local-1425081759269/*
+local-1426533911241/*
+local-1426633911242/*
+local-1430917381534/*
 DESCRIPTION
 NAMESPACE

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 164a836..fc42f48 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -229,6 +229,14 @@
       <version>3.2.10</version>
     </dependency>
     <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.mesos</groupId>
       <artifactId>mesos</artifactId>
       <classifier>${mesos.classifier}</classifier>

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/java/org/apache/spark/JobExecutionStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/JobExecutionStatus.java 
b/core/src/main/java/org/apache/spark/JobExecutionStatus.java
index 6e16131..0287fb7 100644
--- a/core/src/main/java/org/apache/spark/JobExecutionStatus.java
+++ b/core/src/main/java/org/apache/spark/JobExecutionStatus.java
@@ -17,9 +17,15 @@
 
 package org.apache.spark;
 
+import org.apache.spark.util.EnumUtil;
+
 public enum JobExecutionStatus {
   RUNNING,
   SUCCEEDED,
   FAILED,
-  UNKNOWN
+  UNKNOWN;
+
+  public static JobExecutionStatus fromString(String str) {
+    return EnumUtil.parseIgnoreCase(JobExecutionStatus.class, str);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java 
b/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java
new file mode 100644
index 0000000..8c7dcf7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1;
+
+import org.apache.spark.util.EnumUtil;
+
+public enum ApplicationStatus {
+  COMPLETED,
+  RUNNING;
+
+  public static ApplicationStatus fromString(String str) {
+    return EnumUtil.parseIgnoreCase(ApplicationStatus.class, str);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java 
b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
new file mode 100644
index 0000000..9dbb565
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1;
+
+import org.apache.spark.util.EnumUtil;
+
+public enum StageStatus {
+  ACTIVE,
+  COMPLETE,
+  FAILED,
+  PENDING;
+
+  public static StageStatus fromString(String str) {
+    return EnumUtil.parseIgnoreCase(StageStatus.class, str);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java 
b/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java
new file mode 100644
index 0000000..f19ed01
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1;
+
+import org.apache.spark.util.EnumUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public enum TaskSorting {
+  ID,
+  INCREASING_RUNTIME("runtime"),
+  DECREASING_RUNTIME("-runtime");
+
+  private final Set<String> alternateNames;
+  private TaskSorting(String... names) {
+    alternateNames = new HashSet<String>();
+    for (String n: names) {
+      alternateNames.add(n);
+    }
+  }
+
+  public static TaskSorting fromString(String str) {
+    String lower = str.toLowerCase();
+    for (TaskSorting t: values()) {
+      if (t.alternateNames.contains(lower)) {
+        return t;
+      }
+    }
+    return EnumUtil.parseIgnoreCase(TaskSorting.class, str);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/java/org/apache/spark/util/EnumUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/EnumUtil.java 
b/core/src/main/java/org/apache/spark/util/EnumUtil.java
new file mode 100644
index 0000000..c40c7e7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/util/EnumUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util;
+
+import com.google.common.base.Joiner;
+import org.apache.spark.annotation.Private;
+
+@Private
+public class EnumUtil {
+  public static <E extends Enum<E>> E parseIgnoreCase(Class<E> clz, String 
str) {
+    E[] constants = clz.getEnumConstants();
+    if (str == null) {
+      return null;
+    }
+    for (E e : constants) {
+      if (e.name().equalsIgnoreCase(str)) {
+        return e;
+      }
+    }
+    throw new IllegalArgumentException(
+      String.format("Illegal type='%s'. Supported type values: %s",
+        str, Joiner.on(", ").join(constants)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b5f040c..b59f562 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -430,7 +430,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     _ui =
       if (conf.getBoolean("spark.ui.enabled", true)) {
         Some(SparkUI.createLiveUI(this, _conf, listenerBus, 
_jobProgressListener,
-          _env.securityManager,appName))
+          _env.securityManager,appName, startTime = startTime))
       } else {
         // For tests, do not enable the UI
         None

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 6a5011a..298a820 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
 
 import org.apache.spark.ui.SparkUI
 
-private[history] case class ApplicationAttemptInfo(
+private[spark] case class ApplicationAttemptInfo(
     attemptId: Option[String],
     startTime: Long,
     endTime: Long,
@@ -27,7 +27,7 @@ private[history] case class ApplicationAttemptInfo(
     sparkUser: String,
     completed: Boolean = false)
 
-private[history] case class ApplicationHistoryInfo(
+private[spark] case class ApplicationHistoryInfo(
     id: String,
     name: String,
     attempts: List[ApplicationAttemptInfo])

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 993763f..45c2be3 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,23 +17,21 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{IOException, BufferedInputStream, FileNotFoundException, 
InputStream}
+import java.io.{BufferedInputStream, FileNotFoundException, IOException, 
InputStream}
 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 
 import scala.collection.mutable
-import scala.concurrent.duration.Duration
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-
-import com.google.common.util.concurrent.MoreExecutors
-import org.apache.hadoop.fs.permission.AccessControlException
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
 import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.permission.AccessControlException
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
 
 /**
  * A class that provides application history from event logs stored in the 
file system.
@@ -151,7 +149,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
             val conf = this.conf.clone()
             val appSecManager = new SecurityManager(conf)
             SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
-              HistoryServer.getAttemptURI(appId, attempt.attemptId))
+              HistoryServer.getAttemptURI(appId, attempt.attemptId), 
attempt.startTime)
             // Do not call ui.bind() to avoid creating a new server for each 
application
           }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 754c8e9..50522e6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -25,6 +25,7 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, 
ServletHolder}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.status.api.v1.{ApplicationInfo, 
ApplicationsListResource, JsonRootResource, UIRoot}
 import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.{SignalLogger, Utils}
@@ -45,7 +46,7 @@ class HistoryServer(
     provider: ApplicationHistoryProvider,
     securityManager: SecurityManager,
     port: Int)
-  extends WebUI(securityManager, port, conf) with Logging {
+  extends WebUI(securityManager, port, conf) with Logging with UIRoot {
 
   // How many applications to retain
   private val retainedApplications = 
conf.getInt("spark.history.retainedApplications", 50)
@@ -56,7 +57,7 @@ class HistoryServer(
       require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
       val ui = provider
         .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
-        .getOrElse(throw new NoSuchElementException())
+        .getOrElse(throw new NoSuchElementException(s"no app with key $key"))
       attachSparkUI(ui)
       ui
     }
@@ -113,6 +114,10 @@ class HistoryServer(
     }
   }
 
+  def getSparkUI(appKey: String): Option[SparkUI] = {
+    Option(appCache.get(appKey))
+  }
+
   initialize()
 
   /**
@@ -123,6 +128,9 @@ class HistoryServer(
    */
   def initialize() {
     attachPage(new HistoryPage(this))
+
+    attachHandler(JsonRootResource.getJsonServlet(this))
+
     attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
 
     val contextHandler = new ServletContextHandler
@@ -160,7 +168,13 @@ class HistoryServer(
    *
    * @return List of all known applications.
    */
-  def getApplicationList(): Iterable[ApplicationHistoryInfo] = 
provider.getListing()
+  def getApplicationList(): Iterable[ApplicationHistoryInfo] = {
+    provider.getListing()
+  }
+
+  def getApplicationInfoList: Iterator[ApplicationInfo] = {
+    
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
+  }
 
   /**
    * Returns the provider configuration to show in the listing page.

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index f59d550..1620e95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.ApplicationDescription
 import org.apache.spark.util.Utils
 
-private[deploy] class ApplicationInfo(
+private[spark] class ApplicationInfo(
     val startTime: Long,
     val id: String,
     val desc: ApplicationDescription,

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0fac3cd..53e1903 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -754,9 +754,9 @@ private[master] class Master(
 
   /**
    * Rebuild a new SparkUI from the given application's event logs.
-   * Return whether this is successful.
+   * Return the UI if successful, else None
    */
-  private def rebuildSparkUI(app: ApplicationInfo): Boolean = {
+  private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
     val appName = app.desc.name
     val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
     try {
@@ -764,7 +764,7 @@ private[master] class Master(
         .getOrElse {
           // Event logging is not enabled for this application
           app.desc.appUiUrl = notFoundBasePath
-          return false
+          return None
         }
 
       val eventLogFilePrefix = EventLoggingListener.getLogPath(
@@ -787,7 +787,7 @@ private[master] class Master(
       val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), 
fs)
       val replayBus = new ReplayListenerBus()
       val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new 
SecurityManager(conf),
-        appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
+        appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", 
app.startTime)
       val maybeTruncated = 
eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
       try {
         replayBus.replay(logInput, eventLogFile, maybeTruncated)
@@ -798,7 +798,7 @@ private[master] class Master(
       webUi.attachSparkUI(ui)
       // Application UI is successfully rebuilt, so link the Master UI to it
       app.desc.appUiUrl = ui.basePath
-      true
+      Some(ui)
     } catch {
       case fnf: FileNotFoundException =>
         // Event logging is enabled for this application, but no event logs 
are found
@@ -808,7 +808,7 @@ private[master] class Master(
         msg += " Did you specify the correct logging directory?"
         msg = URLEncoder.encode(msg, "UTF-8")
         app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
-        false
+        None
       case e: Exception =>
         // Relay exception message to application UI page
         val title = s"Application history load error (${app.id})"
@@ -817,7 +817,7 @@ private[master] class Master(
         logError(msg, e)
         msg = URLEncoder.encode(msg, "UTF-8")
         app.desc.appUiUrl = notFoundBasePath + 
s"?msg=$msg&exception=$exception&title=$title"
-        false
+        None
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 273f077..06e265f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -23,10 +23,8 @@ import scala.concurrent.Await
 import scala.xml.Node
 
 import akka.pattern.ask
-import org.json4s.JValue
-import org.json4s.JsonAST.JNothing
 
-import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
+import org.apache.spark.deploy.ExecutorState
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, 
RequestMasterState}
 import org.apache.spark.deploy.master.ExecutorDesc
 import org.apache.spark.ui.{UIUtils, WebUIPage}
@@ -38,21 +36,6 @@ private[ui] class ApplicationPage(parent: MasterWebUI) 
extends WebUIPage("app")
   private val timeout = parent.timeout
 
   /** Executor details for a particular application */
-  override def renderJson(request: HttpServletRequest): JValue = {
-    val appId = request.getParameter("appId")
-    val stateFuture = (master ? 
RequestMasterState)(timeout).mapTo[MasterStateResponse]
-    val state = Await.result(stateFuture, timeout)
-    val app = state.activeApps.find(_.id == appId).getOrElse({
-      state.completedApps.find(_.id == appId).getOrElse(null)
-    })
-    if (app == null) {
-      JNothing
-    } else {
-      JsonProtocol.writeApplicationInfo(app)
-    }
-  }
-
-  /** Executor details for a particular application */
   def render(request: HttpServletRequest): Seq[Node] = {
     val appId = request.getParameter("appId")
     val stateFuture = (master ? 
RequestMasterState)(timeout).mapTo[MasterStateResponse]

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 1f2c3fd..7569276 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -35,10 +35,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends 
WebUIPage("") {
   private val master = parent.masterActorRef
   private val timeout = parent.timeout
 
-  override def renderJson(request: HttpServletRequest): JValue = {
+  def getMasterState: MasterStateResponse = {
     val stateFuture = (master ? 
RequestMasterState)(timeout).mapTo[MasterStateResponse]
-    val state = Await.result(stateFuture, timeout)
-    JsonProtocol.writeMasterState(state)
+    Await.result(stateFuture, timeout)
+  }
+
+  override def renderJson(request: HttpServletRequest): JValue = {
+    JsonProtocol.writeMasterState(getMasterState)
   }
 
   def handleAppKillRequest(request: HttpServletRequest): Unit = {
@@ -68,8 +71,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends 
WebUIPage("") {
 
   /** Index view listing applications and executors */
   def render(request: HttpServletRequest): Seq[Node] = {
-    val stateFuture = (master ? 
RequestMasterState)(timeout).mapTo[MasterStateResponse]
-    val state = Await.result(stateFuture, timeout)
+    val state = getMasterState
 
     val workerHeaders = Seq("Worker Id", "Address", "State", "Cores", "Memory")
     val workers = state.workers.sortBy(_.id)

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index dea0a65..eb26e9f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.master.ui
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.master.Master
+import org.apache.spark.status.api.v1.{ApplicationsListResource, 
ApplicationInfo, JsonRootResource, UIRoot}
 import org.apache.spark.ui.{SparkUI, WebUI}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.RpcUtils
@@ -28,12 +29,15 @@ import org.apache.spark.util.RpcUtils
  */
 private[master]
 class MasterWebUI(val master: Master, requestedPort: Int)
-  extends WebUI(master.securityMgr, requestedPort, master.conf, name = 
"MasterUI") with Logging {
+  extends WebUI(master.securityMgr, requestedPort, master.conf, name = 
"MasterUI") with Logging
+  with UIRoot {
 
   val masterActorRef = master.self
   val timeout = RpcUtils.askTimeout(master.conf)
   val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
 
+  val masterPage = new MasterPage(this)
+
   initialize()
 
   /** Initialize all components of the server. */
@@ -43,6 +47,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
     attachPage(new HistoryNotFoundPage(this))
     attachPage(masterPage)
     attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, 
"/static"))
+    attachHandler(JsonRootResource.getJsonServlet(this))
     attachHandler(createRedirectHandler(
       "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = 
Set("POST")))
     attachHandler(createRedirectHandler(
@@ -60,6 +65,23 @@ class MasterWebUI(val master: Master, requestedPort: Int)
     assert(serverInfo.isDefined, "Master UI must be bound to a server before 
detaching SparkUIs")
     ui.getHandlers.foreach(detachHandler)
   }
+
+  def getApplicationInfoList: Iterator[ApplicationInfo] = {
+    val state = masterPage.getMasterState
+    val activeApps = state.activeApps.sortBy(_.startTime).reverse
+    val completedApps = state.completedApps.sortBy(_.endTime).reverse
+    activeApps.iterator.map { 
ApplicationsListResource.convertApplicationInfo(_, false) } ++
+      completedApps.iterator.map { 
ApplicationsListResource.convertApplicationInfo(_, true) }
+  }
+
+  def getSparkUI(appId: String): Option[SparkUI] = {
+    val state = masterPage.getMasterState
+    val activeApps = state.activeApps.sortBy(_.startTime).reverse
+    val completedApps = state.completedApps.sortBy(_.endTime).reverse
+    (activeApps ++ completedApps).find { _.id == appId }.flatMap {
+      master.rebuildSparkUI
+    }
+  }
 }
 
 private[master] object MasterWebUI {

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
new file mode 100644
index 0000000..5783df5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.{Arrays, Date, List => JList}
+import javax.ws.rs._
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.JobProgressListener
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllJobsResource(ui: SparkUI) {
+
+  @GET
+  def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): 
Seq[JobData] = {
+    val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
+      AllJobsResource.getStatusToJobs(ui)
+    val adjStatuses: JList[JobExecutionStatus] = {
+      if (statuses.isEmpty) {
+        Arrays.asList(JobExecutionStatus.values(): _*)
+      } else {
+        statuses
+      }
+    }
+    val jobInfos = for {
+      (status, jobs) <- statusToJobs
+      job <- jobs if adjStatuses.contains(status)
+    } yield {
+      AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
+    }
+    jobInfos.sortBy{- _.jobId}
+  }
+
+}
+
+private[v1] object AllJobsResource {
+
+  def getStatusToJobs(ui: SparkUI): Seq[(JobExecutionStatus, Seq[JobUIData])] 
= {
+    val statusToJobs = ui.jobProgressListener.synchronized {
+      Seq(
+        JobExecutionStatus.RUNNING -> 
ui.jobProgressListener.activeJobs.values.toSeq,
+        JobExecutionStatus.SUCCEEDED -> 
ui.jobProgressListener.completedJobs.toSeq,
+        JobExecutionStatus.FAILED -> 
ui.jobProgressListener.failedJobs.reverse.toSeq
+      )
+    }
+    statusToJobs
+  }
+
+  def convertJobData(
+      job: JobUIData,
+      listener: JobProgressListener,
+      includeStageDetails: Boolean): JobData = {
+    listener.synchronized {
+      val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max)
+      val lastStageData = lastStageInfo.flatMap { s =>
+        listener.stageIdToData.get((s.stageId, s.attemptId))
+      }
+      val lastStageName = lastStageInfo.map { _.name }.getOrElse("(Unknown 
Stage Name)")
+      val lastStageDescription = lastStageData.flatMap { _.description }
+      new JobData(
+        jobId = job.jobId,
+        name = lastStageName,
+        description = lastStageDescription,
+        submissionTime = job.submissionTime.map{new Date(_)},
+        completionTime = job.completionTime.map{new Date(_)},
+        stageIds = job.stageIds,
+        jobGroup = job.jobGroup,
+        status = job.status,
+        numTasks = job.numTasks,
+        numActiveTasks = job.numActiveTasks,
+        numCompletedTasks = job.numCompletedTasks,
+        numSkippedTasks = job.numCompletedTasks,
+        numFailedTasks = job.numFailedTasks,
+        numActiveStages = job.numActiveStages,
+        numCompletedStages = job.completedStageIndices.size,
+        numSkippedStages = job.numSkippedStages,
+        numFailedStages = job.numFailedStages
+      )
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
new file mode 100644
index 0000000..645ede2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.{GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.storage.StorageListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllRDDResource(ui: SparkUI) {
+
+  @GET
+  def rddList(): Seq[RDDStorageInfo] = {
+    val storageStatusList = ui.storageListener.storageStatusList
+    val rddInfos = ui.storageListener.rddInfoList
+    rddInfos.map{rddInfo =>
+      AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
+        includeDetails = false)
+    }
+  }
+
+}
+
+private[spark] object AllRDDResource {
+
+  def getRDDStorageInfo(
+      rddId: Int,
+      listener: StorageListener,
+      includeDetails: Boolean): Option[RDDStorageInfo] = {
+    val storageStatusList = listener.storageStatusList
+    listener.rddInfoList.find { _.id == rddId }.map { rddInfo =>
+      getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails)
+    }
+  }
+
+  def getRDDStorageInfo(
+      rddId: Int,
+      rddInfo: RDDInfo,
+      storageStatusList: Seq[StorageStatus],
+      includeDetails: Boolean): RDDStorageInfo = {
+    val workers = storageStatusList.map { (rddId, _) }
+    val blockLocations = StorageUtils.getRddBlockLocations(rddId, 
storageStatusList)
+    val blocks = storageStatusList
+      .flatMap { _.rddBlocksById(rddId) }
+      .sortWith { _._1.name < _._1.name }
+      .map { case (blockId, status) =>
+        (blockId, status, 
blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
+      }
+
+    val dataDistribution = if (includeDetails) {
+      Some(storageStatusList.map { status =>
+        new RDDDataDistribution(
+          address = status.blockManagerId.hostPort,
+          memoryUsed = status.memUsedByRdd(rddId),
+          memoryRemaining = status.memRemaining,
+          diskUsed = status.diskUsedByRdd(rddId)
+        ) } )
+    } else {
+      None
+    }
+    val partitions = if (includeDetails) {
+      Some(blocks.map { case (id, block, locations) =>
+        new RDDPartitionInfo(
+          blockName = id.name,
+          storageLevel = block.storageLevel.description,
+          memoryUsed = block.memSize,
+          diskUsed = block.diskSize,
+          executors = locations
+        )
+      } )
+    } else {
+      None
+    }
+
+    new RDDStorageInfo(
+      id = rddId,
+      name = rddInfo.name,
+      numPartitions = rddInfo.numPartitions,
+      numCachedPartitions = rddInfo.numCachedPartitions,
+      storageLevel = rddInfo.storageLevel.description,
+      memoryUsed = rddInfo.memSize,
+      diskUsed = rddInfo.diskSize,
+      dataDistribution = dataDistribution,
+      partitions = partitions
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
new file mode 100644
index 0000000..5060858
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.{Arrays, Date, List => JList}
+import javax.ws.rs.{GET, PathParam, Produces, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, 
OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => 
InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, 
TaskMetrics => InternalTaskMetrics}
+import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, 
StageInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
+import org.apache.spark.util.Distribution
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllStagesResource(ui: SparkUI) {
+
+  @GET
+  def stageList(@QueryParam("status") statuses: JList[StageStatus]): 
Seq[StageData] = {
+    val listener = ui.jobProgressListener
+    val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
+    val adjStatuses = {
+      if (statuses.isEmpty()) {
+        Arrays.asList(StageStatus.values(): _*)
+      } else {
+        statuses
+      }
+    }
+    for {
+      (status, stageList) <- stageAndStatus
+      stageInfo: StageInfo <- stageList if adjStatuses.contains(status)
+      stageUiData: StageUIData <- listener.synchronized {
+        listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
+      }
+    } yield {
+      AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, 
includeDetails = false)
+    }
+  }
+}
+
+private[v1] object AllStagesResource {
+  def stageUiToStageData(
+      status: StageStatus,
+      stageInfo: StageInfo,
+      stageUiData: StageUIData,
+      includeDetails: Boolean): StageData = {
+
+    val taskData = if (includeDetails) {
+      Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } 
)
+    } else {
+      None
+    }
+    val executorSummary = if (includeDetails) {
+      Some(stageUiData.executorSummary.map { case (k, summary) =>
+        k -> new ExecutorStageSummary(
+          taskTime = summary.taskTime,
+          failedTasks = summary.failedTasks,
+          succeededTasks = summary.succeededTasks,
+          inputBytes = summary.inputBytes,
+          outputBytes = summary.outputBytes,
+          shuffleRead = summary.shuffleRead,
+          shuffleWrite = summary.shuffleWrite,
+          memoryBytesSpilled = summary.memoryBytesSpilled,
+          diskBytesSpilled = summary.diskBytesSpilled
+        )
+      })
+    } else {
+      None
+    }
+
+    val accumulableInfo = stageUiData.accumulables.values.map { 
convertAccumulableInfo }.toSeq
+
+    new StageData(
+      status = status,
+      stageId = stageInfo.stageId,
+      attemptId = stageInfo.attemptId,
+      numActiveTasks = stageUiData.numActiveTasks,
+      numCompleteTasks = stageUiData.numCompleteTasks,
+      numFailedTasks = stageUiData.numFailedTasks,
+      executorRunTime = stageUiData.executorRunTime,
+      inputBytes = stageUiData.inputBytes,
+      inputRecords = stageUiData.inputRecords,
+      outputBytes = stageUiData.outputBytes,
+      outputRecords = stageUiData.outputRecords,
+      shuffleReadBytes = stageUiData.shuffleReadTotalBytes,
+      shuffleReadRecords = stageUiData.shuffleReadRecords,
+      shuffleWriteBytes = stageUiData.shuffleWriteBytes,
+      shuffleWriteRecords = stageUiData.shuffleWriteRecords,
+      memoryBytesSpilled = stageUiData.memoryBytesSpilled,
+      diskBytesSpilled = stageUiData.diskBytesSpilled,
+      schedulingPool = stageUiData.schedulingPool,
+      name = stageInfo.name,
+      details = stageInfo.details,
+      accumulatorUpdates = accumulableInfo,
+      tasks = taskData,
+      executorSummary = executorSummary
+    )
+  }
+
+  def stagesAndStatus(ui: SparkUI): Seq[(StageStatus, Seq[StageInfo])] = {
+    val listener = ui.jobProgressListener
+    listener.synchronized {
+      Seq(
+        StageStatus.ACTIVE -> listener.activeStages.values.toSeq,
+        StageStatus.COMPLETE -> listener.completedStages.reverse.toSeq,
+        StageStatus.FAILED -> listener.failedStages.reverse.toSeq,
+        StageStatus.PENDING -> listener.pendingStages.values.toSeq
+      )
+    }
+  }
+
+  def convertTaskData(uiData: TaskUIData): TaskData = {
+    new TaskData(
+      taskId = uiData.taskInfo.taskId,
+      index = uiData.taskInfo.index,
+      attempt = uiData.taskInfo.attempt,
+      launchTime = new Date(uiData.taskInfo.launchTime),
+      executorId = uiData.taskInfo.executorId,
+      host = uiData.taskInfo.host,
+      taskLocality = uiData.taskInfo.taskLocality.toString(),
+      speculative = uiData.taskInfo.speculative,
+      accumulatorUpdates = uiData.taskInfo.accumulables.map { 
convertAccumulableInfo },
+      errorMessage = uiData.errorMessage,
+      taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics }
+    )
+  }
+
+  def taskMetricDistributions(
+      allTaskData: Iterable[TaskUIData],
+      quantiles: Array[Double]): TaskMetricDistributions = {
+
+    val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq
+
+    def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] =
+      Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles)
+
+    // We need to do a lot of similar munging to nested metrics here.  For 
each one,
+    // we want (a) extract the values for nested metrics (b) make a 
distribution for each metric
+    // (c) shove the distribution into the right field in our return type and 
(d) only return
+    // a result if the option is defined for any of the tasks.  MetricHelper 
is a little util
+    // to make it a little easier to deal w/ all of the nested options.  
Mostly it lets us just
+    // implement one "build" method, which just builds the quantiles for each 
field.
+
+    val inputMetrics: Option[InputMetricDistributions] =
+      new MetricHelper[InternalInputMetrics, 
InputMetricDistributions](rawMetrics, quantiles) {
+        def getSubmetrics(raw: InternalTaskMetrics): 
Option[InternalInputMetrics] = {
+          raw.inputMetrics
+        }
+
+        def build: InputMetricDistributions = new InputMetricDistributions(
+          bytesRead = submetricQuantiles(_.bytesRead),
+          recordsRead = submetricQuantiles(_.recordsRead)
+        )
+      }.metricOption
+
+    val outputMetrics: Option[OutputMetricDistributions] =
+      new MetricHelper[InternalOutputMetrics, 
OutputMetricDistributions](rawMetrics, quantiles) {
+        def getSubmetrics(raw:InternalTaskMetrics): 
Option[InternalOutputMetrics] = {
+          raw.outputMetrics
+        }
+        def build: OutputMetricDistributions = new OutputMetricDistributions(
+          bytesWritten = submetricQuantiles(_.bytesWritten),
+          recordsWritten = submetricQuantiles(_.recordsWritten)
+        )
+      }.metricOption
+
+    val shuffleReadMetrics: Option[ShuffleReadMetricDistributions] =
+      new MetricHelper[InternalShuffleReadMetrics, 
ShuffleReadMetricDistributions](rawMetrics,
+        quantiles) {
+        def getSubmetrics(raw: InternalTaskMetrics): 
Option[InternalShuffleReadMetrics] = {
+          raw.shuffleReadMetrics
+        }
+        def build: ShuffleReadMetricDistributions = new 
ShuffleReadMetricDistributions(
+          readBytes = submetricQuantiles(_.totalBytesRead),
+          readRecords = submetricQuantiles(_.recordsRead),
+          remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
+          remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
+          localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
+          totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
+          fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
+        )
+      }.metricOption
+
+    val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions] =
+      new MetricHelper[InternalShuffleWriteMetrics, 
ShuffleWriteMetricDistributions](rawMetrics,
+        quantiles) {
+        def getSubmetrics(raw: InternalTaskMetrics): 
Option[InternalShuffleWriteMetrics] = {
+          raw.shuffleWriteMetrics
+        }
+        def build: ShuffleWriteMetricDistributions = new 
ShuffleWriteMetricDistributions(
+          writeBytes = submetricQuantiles(_.shuffleBytesWritten),
+          writeRecords = submetricQuantiles(_.shuffleRecordsWritten),
+          writeTime = submetricQuantiles(_.shuffleWriteTime)
+        )
+      }.metricOption
+
+    new TaskMetricDistributions(
+      quantiles = quantiles,
+      executorDeserializeTime = metricQuantiles(_.executorDeserializeTime),
+      executorRunTime = metricQuantiles(_.executorRunTime),
+      resultSize = metricQuantiles(_.resultSize),
+      jvmGcTime = metricQuantiles(_.jvmGCTime),
+      resultSerializationTime = metricQuantiles(_.resultSerializationTime),
+      memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled),
+      diskBytesSpilled = metricQuantiles(_.diskBytesSpilled),
+      inputMetrics = inputMetrics,
+      outputMetrics = outputMetrics,
+      shuffleReadMetrics = shuffleReadMetrics,
+      shuffleWriteMetrics = shuffleWriteMetrics
+    )
+  }
+
+  def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = {
+    new AccumulableInfo(acc.id, acc.name, acc.update, acc.value)
+  }
+
+  def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
+    new TaskMetrics(
+      executorDeserializeTime = internal.executorDeserializeTime,
+      executorRunTime = internal.executorRunTime,
+      resultSize = internal.resultSize,
+      jvmGcTime = internal.jvmGCTime,
+      resultSerializationTime = internal.resultSerializationTime,
+      memoryBytesSpilled = internal.memoryBytesSpilled,
+      diskBytesSpilled = internal.diskBytesSpilled,
+      inputMetrics = internal.inputMetrics.map { convertInputMetrics },
+      outputMetrics = Option(internal.outputMetrics).flatten.map { 
convertOutputMetrics },
+      shuffleReadMetrics = internal.shuffleReadMetrics.map { 
convertShuffleReadMetrics },
+      shuffleWriteMetrics = internal.shuffleWriteMetrics.map { 
convertShuffleWriteMetrics }
+    )
+  }
+
+  def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = {
+    new InputMetrics(
+      bytesRead = internal.bytesRead,
+      recordsRead = internal.recordsRead
+    )
+  }
+
+  def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = {
+    new OutputMetrics(
+      bytesWritten = internal.bytesWritten,
+      recordsWritten = internal.recordsWritten
+    )
+  }
+
+  def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): 
ShuffleReadMetrics = {
+    new ShuffleReadMetrics(
+      remoteBlocksFetched = internal.remoteBlocksFetched,
+      localBlocksFetched = internal.localBlocksFetched,
+      fetchWaitTime = internal.fetchWaitTime,
+      remoteBytesRead = internal.remoteBytesRead,
+      totalBlocksFetched = internal.totalBlocksFetched,
+      recordsRead = internal.recordsRead
+    )
+  }
+
+  def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): 
ShuffleWriteMetrics = {
+    new ShuffleWriteMetrics(
+      bytesWritten = internal.shuffleBytesWritten,
+      writeTime = internal.shuffleWriteTime,
+      recordsWritten = internal.shuffleRecordsWritten
+    )
+  }
+}
+
+/**
+ * Helper for getting distributions from nested metric types.  Many of the 
metrics we want are
+ * contained in options inside TaskMetrics (eg., ShuffleWriteMetrics). This 
makes it easy to handle
+ * the options (returning None if the metrics are all empty), and extract the 
quantiles for each
+ * metric.  After creating an instance, call metricOption to get the result 
type.
+ */
+private[v1] abstract class MetricHelper[I,O](
+    rawMetrics: Seq[InternalTaskMetrics],
+    quantiles: Array[Double]) {
+
+  def getSubmetrics(raw: InternalTaskMetrics): Option[I]
+
+  def build: O
+
+  val data: Seq[I] = rawMetrics.flatMap(getSubmetrics)
+
+  /** applies the given function to all input metrics, and returns the 
quantiles */
+  def submetricQuantiles(f: I => Double): IndexedSeq[Double] = {
+    Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles)
+  }
+
+  def metricOption: Option[O] = {
+    if (data.isEmpty) {
+      None
+    } else {
+      Some(build)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
 
b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
new file mode 100644
index 0000000..17b521f
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.{Arrays, Date, List => JList}
+import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.deploy.history.ApplicationHistoryInfo
+import org.apache.spark.deploy.master.{ApplicationInfo => 
InternalApplicationInfo}
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class ApplicationListResource(uiRoot: UIRoot) {
+
+  @GET
+  def appList(
+      @QueryParam("status") status: JList[ApplicationStatus],
+      @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: 
SimpleDateParam,
+      @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: 
SimpleDateParam)
+  : Iterator[ApplicationInfo] = {
+    val allApps = uiRoot.getApplicationInfoList
+    val adjStatus = {
+      if (status.isEmpty) {
+        Arrays.asList(ApplicationStatus.values(): _*)
+      } else {
+        status
+      }
+    }
+    val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED)
+    val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING)
+    allApps.filter { app =>
+      val anyRunning = app.attempts.exists(!_.completed)
+      // if any attempt is still running, we consider the app to also still be 
running
+      val statusOk = (!anyRunning && includeCompleted) ||
+        (anyRunning && includeRunning)
+      // keep the app if *any* attempts fall in the right time window
+      val dateOk = app.attempts.exists { attempt =>
+        attempt.startTime.getTime >= minDate.timestamp &&
+          attempt.startTime.getTime <= maxDate.timestamp
+      }
+      statusOk && dateOk
+    }
+  }
+}
+
+private[spark] object ApplicationsListResource {
+  def appHistoryInfoToPublicAppInfo(app: ApplicationHistoryInfo): 
ApplicationInfo = {
+    new ApplicationInfo(
+      id = app.id,
+      name = app.name,
+      attempts = app.attempts.map { internalAttemptInfo =>
+        new ApplicationAttemptInfo(
+          attemptId = internalAttemptInfo.attemptId,
+          startTime = new Date(internalAttemptInfo.startTime),
+          endTime = new Date(internalAttemptInfo.endTime),
+          sparkUser = internalAttemptInfo.sparkUser,
+          completed = internalAttemptInfo.completed
+        )
+      }
+    )
+  }
+
+  def convertApplicationInfo(
+      internal: InternalApplicationInfo,
+      completed: Boolean): ApplicationInfo = {
+    // standalone application info always has just one attempt
+    new ApplicationInfo(
+      id = internal.id,
+      name = internal.desc.name,
+      attempts = Seq(new ApplicationAttemptInfo(
+        attemptId = None,
+        startTime = new Date(internal.startTime),
+        endTime = new Date(internal.endTime),
+        sparkUser = internal.desc.user,
+        completed = completed
+      ))
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
new file mode 100644
index 0000000..8ad4656
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala
@@ -0,0 +1,36 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.exec.ExecutorsPage
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class ExecutorListResource(ui: SparkUI) {
+
+  @GET
+  def executorList(): Seq[ExecutorSummary] = {
+    val listener = ui.executorsListener
+    val storageStatusList = listener.storageStatusList
+    (0 until storageStatusList.size).map { statusId =>
+      ExecutorsPage.getExecInfo(listener, statusId)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
new file mode 100644
index 0000000..202a519
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.io.OutputStream
+import java.lang.annotation.Annotation
+import java.lang.reflect.Type
+import java.text.SimpleDateFormat
+import java.util.{Calendar, SimpleTimeZone}
+import javax.ws.rs.Produces
+import javax.ws.rs.core.{MediaType, MultivaluedMap}
+import javax.ws.rs.ext.{MessageBodyWriter, Provider}
+
+import com.fasterxml.jackson.annotation.JsonInclude
+import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
+
+/**
+ * This class converts the POJO metric responses into json, using jackson.
+ *
+ * This doesn't follow the standard jersey-jackson plugin options, because we 
want to stick
+ * with an old version of jersey (since we have it from yarn anyway) and don't 
want to pull in lots
+ * of dependencies from a new plugin.
+ *
+ * Note that jersey automatically discovers this class based on its package 
and its annotations.
+ */
+@Provider
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
+
+  val mapper = new ObjectMapper() {
+    override def writeValueAsString(t: Any): String = {
+      super.writeValueAsString(t)
+    }
+  }
+  mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
+  mapper.enable(SerializationFeature.INDENT_OUTPUT)
+  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+  mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
+
+  override def isWriteable(
+      aClass: Class[_],
+      `type`: Type,
+      annotations: Array[Annotation],
+      mediaType: MediaType): Boolean = {
+      true
+  }
+
+  override def writeTo(
+      t: Object,
+      aClass: Class[_],
+      `type`: Type,
+      annotations: Array[Annotation],
+      mediaType: MediaType,
+      multivaluedMap: MultivaluedMap[String, AnyRef],
+      outputStream: OutputStream): Unit = {
+    t match {
+      case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8"))
+      case _ => mapper.writeValue(outputStream, t)
+    }
+  }
+
+  override def getSize(
+      t: Object,
+      aClass: Class[_],
+      `type`: Type,
+      annotations: Array[Annotation],
+      mediaType: MediaType): Long = {
+    -1L
+  }
+}
+
+private[spark] object JacksonMessageWriter {
+  def makeISODateFormat: SimpleDateFormat = {
+    val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
+    val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
+    iso8601.setCalendar(cal)
+    iso8601
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala
new file mode 100644
index 0000000..c3ec45f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.servlet.ServletContext
+import javax.ws.rs._
+import javax.ws.rs.core.{Context, Response}
+
+import com.sun.jersey.api.core.ResourceConfig
+import com.sun.jersey.spi.container.servlet.ServletContainer
+import org.eclipse.jetty.server.handler.ContextHandler
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.ui.SparkUI
+
+/**
+ * Main entry point for serving spark application metrics as json, using 
JAX-RS.
+ *
+ * Each resource should have endpoints that return **public** classes defined 
in api.scala.  Mima
+ * binary compatibility checks ensure that we don't inadvertently make changes 
that break the api.
+ * The returned objects are automatically converted to json by jackson with 
JacksonMessageWriter.
+ * In addition, there are a number of tests in HistoryServerSuite that compare 
the json to "golden
+ * files".  Any changes and additions should be reflected there as well -- see 
the notes in
+ * HistoryServerSuite.
+ */
+@Path("/v1")
+private[v1] class JsonRootResource extends UIRootFromServletContext {
+
+  @Path("applications")
+  def getApplicationList(): ApplicationListResource = {
+    new ApplicationListResource(uiRoot)
+  }
+
+  @Path("applications/{appId}")
+  def getApplication(): OneApplicationResource = {
+    new OneApplicationResource(uiRoot)
+  }
+
+  @Path("applications/{appId}/{attemptId}/jobs")
+  def getJobs(
+      @PathParam("appId") appId: String,
+      @PathParam("attemptId") attemptId: String): AllJobsResource = {
+    uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+      new AllJobsResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/jobs")
+  def getJobs(@PathParam("appId") appId: String): AllJobsResource = {
+    uiRoot.withSparkUI(appId, None) { ui =>
+      new AllJobsResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/jobs/{jobId: \\d+}")
+  def getJob(@PathParam("appId") appId: String): OneJobResource = {
+    uiRoot.withSparkUI(appId, None) { ui =>
+      new OneJobResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/{attemptId}/jobs/{jobId: \\d+}")
+  def getJob(
+      @PathParam("appId") appId: String,
+      @PathParam("attemptId") attemptId: String): OneJobResource = {
+    uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+      new OneJobResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/executors")
+  def getExecutors(@PathParam("appId") appId: String): ExecutorListResource = {
+    uiRoot.withSparkUI(appId, None) { ui =>
+      new ExecutorListResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/{attemptId}/executors")
+  def getExecutors(
+      @PathParam("appId") appId: String,
+      @PathParam("attemptId") attemptId: String): ExecutorListResource = {
+    uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+      new ExecutorListResource(ui)
+    }
+  }
+
+
+  @Path("applications/{appId}/stages")
+  def getStages(@PathParam("appId") appId: String): AllStagesResource= {
+    uiRoot.withSparkUI(appId, None) { ui =>
+      new AllStagesResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/{attemptId}/stages")
+  def getStages(
+      @PathParam("appId") appId: String,
+      @PathParam("attemptId") attemptId: String): AllStagesResource= {
+    uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+      new AllStagesResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/stages/{stageId: \\d+}")
+  def getStage(@PathParam("appId") appId: String): OneStageResource= {
+    uiRoot.withSparkUI(appId, None) { ui =>
+      new OneStageResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/{attemptId}/stages/{stageId: \\d+}")
+  def getStage(
+      @PathParam("appId") appId: String,
+      @PathParam("attemptId") attemptId: String): OneStageResource = {
+    uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+      new OneStageResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/storage/rdd")
+  def getRdds(@PathParam("appId") appId: String): AllRDDResource = {
+    uiRoot.withSparkUI(appId, None) { ui =>
+      new AllRDDResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/{attemptId}/storage/rdd")
+  def getRdds(
+      @PathParam("appId") appId: String,
+      @PathParam("attemptId") attemptId: String): AllRDDResource = {
+    uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+      new AllRDDResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/storage/rdd/{rddId: \\d+}")
+  def getRdd(@PathParam("appId") appId: String): OneRDDResource = {
+    uiRoot.withSparkUI(appId, None) { ui =>
+      new OneRDDResource(ui)
+    }
+  }
+
+  @Path("applications/{appId}/{attemptId}/storage/rdd/{rddId: \\d+}")
+  def getRdd(
+      @PathParam("appId") appId: String,
+      @PathParam("attemptId") attemptId: String): OneRDDResource = {
+    uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+      new OneRDDResource(ui)
+    }
+  }
+
+}
+
+private[spark] object JsonRootResource {
+
+  def getJsonServlet(uiRoot: UIRoot): ServletContextHandler = {
+    val jerseyContext = new 
ServletContextHandler(ServletContextHandler.NO_SESSIONS)
+    jerseyContext.setContextPath("/json")
+    val holder:ServletHolder = new ServletHolder(classOf[ServletContainer])
+    
holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
+      "com.sun.jersey.api.core.PackagesResourceConfig")
+    holder.setInitParameter("com.sun.jersey.config.property.packages",
+      "org.apache.spark.status.api.v1")
+    holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS,
+      classOf[SecurityFilter].getCanonicalName)
+    UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
+    jerseyContext.addServlet(holder, "/*")
+    jerseyContext
+  }
+}
+
+/**
+ * This trait is shared by the all the root containers for application UI 
information --
+ * the HistoryServer, the Master UI, and the application UI.  This provides 
the common
+ * interface needed for them all to expose application info as json.
+ */
+private[spark] trait UIRoot {
+  def getSparkUI(appKey: String): Option[SparkUI]
+  def getApplicationInfoList: Iterator[ApplicationInfo]
+
+  /**
+   * Get the spark UI with the given appID, and apply a function
+   * to it.  If there is no such app, throw an appropriate exception
+   */
+  def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => 
T): T = {
+    val appKey = attemptId.map(appId + "/" + _).getOrElse(appId)
+    getSparkUI(appKey) match {
+      case Some(ui) =>
+        f(ui)
+      case None => throw new NotFoundException("no such app: " + appId)
+    }
+  }
+  def securityManager: SecurityManager
+}
+
+private[v1] object UIRootFromServletContext {
+
+  private val attribute = getClass.getCanonicalName
+
+  def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot): Unit = {
+    contextHandler.setAttribute(attribute, uiRoot)
+  }
+
+  def getUiRoot(context: ServletContext): UIRoot = {
+    context.getAttribute(attribute).asInstanceOf[UIRoot]
+  }
+}
+
+private[v1] trait UIRootFromServletContext {
+  @Context
+  var servletContext: ServletContext = _
+
+  def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext)
+}
+
+private[v1] class NotFoundException(msg: String) extends 
WebApplicationException(
+  new NoSuchElementException(msg),
+    Response
+      .status(Response.Status.NOT_FOUND)
+      .entity(ErrorWrapper(msg))
+      .build()
+)
+
+private[v1] class BadParameterException(msg: String) extends 
WebApplicationException(
+  new IllegalArgumentException(msg),
+  Response
+    .status(Response.Status.BAD_REQUEST)
+    .entity(ErrorWrapper(msg))
+    .build()
+) {
+  def this(param: String, exp: String, actual: String) = {
+    this(raw"""Bad value for parameter "$param".  Expected a $exp, got 
"$actual"""")
+  }
+}
+
+/**
+ * Signal to JacksonMessageWriter to not convert the message into json (which 
would result in an
+ * extra set of quotes).
+ */
+private[v1] case class ErrorWrapper(s: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
 
b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
new file mode 100644
index 0000000..b5ef726
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.core.MediaType
+import javax.ws.rs.{Produces, PathParam, GET}
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneApplicationResource(uiRoot: UIRoot) {
+
+  @GET
+  def getApp(@PathParam("appId") appId: String): ApplicationInfo = {
+    val apps = uiRoot.getApplicationInfoList.find { _.id == appId }
+    apps.getOrElse(throw new NotFoundException("unknown app: " + appId))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
new file mode 100644
index 0000000..6d8a60d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.{PathParam, GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneJobResource(ui: SparkUI) {
+
+  @GET
+  def oneJob(@PathParam("jobId") jobId: Int): JobData = {
+    val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
+      AllJobsResource.getStatusToJobs(ui)
+    val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => 
jobInfo.jobId == jobId}
+    jobOpt.map { job =>
+      AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
+    }.getOrElse {
+      throw new NotFoundException("unknown job: " + jobId)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
new file mode 100644
index 0000000..07b224f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs.{PathParam, GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.ui.SparkUI
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneRDDResource(ui: SparkUI) {
+
+  @GET
+  def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo  = {
+    AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, 
true).getOrElse(
+      throw new NotFoundException(s"no rdd found w/ id $rddId")
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/532bfdad/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
new file mode 100644
index 0000000..fd24aea
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import javax.ws.rs._
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.SparkException
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.status.api.v1.StageStatus._
+import org.apache.spark.status.api.v1.TaskSorting._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.JobProgressListener
+import org.apache.spark.ui.jobs.UIData.StageUIData
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneStageResource(ui: SparkUI) {
+
+  @GET
+  @Path("")
+  def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = {
+    withStage(stageId){ stageAttempts =>
+      stageAttempts.map { stage =>
+        AllStagesResource.stageUiToStageData(stage.status, stage.info, 
stage.ui,
+          includeDetails = true)
+      }
+    }
+  }
+
+  @GET
+  @Path("/{stageAttemptId: \\d+}")
+  def oneAttemptData(
+      @PathParam("stageId") stageId: Int,
+      @PathParam("stageAttemptId") stageAttemptId: Int): StageData = {
+    withStageAttempt(stageId, stageAttemptId) { stage =>
+      AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
+        includeDetails = true)
+    }
+  }
+
+  @GET
+  @Path("/{stageAttemptId: \\d+}/taskSummary")
+  def taskSummary(
+      @PathParam("stageId") stageId: Int,
+      @PathParam("stageAttemptId") stageAttemptId: Int,
+      @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") 
quantileString: String)
+  : TaskMetricDistributions = {
+    withStageAttempt(stageId, stageAttemptId) { stage =>
+      val quantiles = quantileString.split(",").map { s =>
+        try {
+          s.toDouble
+        } catch {
+          case nfe: NumberFormatException =>
+            throw new BadParameterException("quantiles", "double", s)
+        }
+      }
+      AllStagesResource.taskMetricDistributions(stage.ui.taskData.values, 
quantiles)
+    }
+  }
+
+  @GET
+  @Path("/{stageAttemptId: \\d+}/taskList")
+  def taskList(
+      @PathParam("stageId") stageId: Int,
+      @PathParam("stageAttemptId") stageAttemptId: Int,
+      @DefaultValue("0") @QueryParam("offset") offset: Int,
+      @DefaultValue("20") @QueryParam("length") length: Int,
+      @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): 
Seq[TaskData] = {
+    withStageAttempt(stageId, stageAttemptId) { stage =>
+      val tasks = 
stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
+        .sorted(OneStageResource.ordering(sortBy))
+      tasks.slice(offset, offset + length)  
+    }
+  }
+
+  private case class StageStatusInfoUi(status: StageStatus, info: StageInfo, 
ui: StageUIData)
+
+  private def withStage[T](stageId: Int)(f: Seq[StageStatusInfoUi] => T): T = {
+    val stageAttempts = findStageStatusUIData(ui.jobProgressListener, stageId)
+    if (stageAttempts.isEmpty) {
+      throw new NotFoundException("unknown stage: " + stageId)
+    } else {
+      f(stageAttempts)
+    }
+  }
+
+  private def findStageStatusUIData(
+      listener: JobProgressListener,
+      stageId: Int): Seq[StageStatusInfoUi] = {
+    listener.synchronized {
+      def getStatusInfoUi(status: StageStatus, infos: Seq[StageInfo]): 
Seq[StageStatusInfoUi] = {
+        infos.filter { _.stageId == stageId }.map { info =>
+          val ui = listener.stageIdToData.getOrElse((info.stageId, 
info.attemptId),
+            // this is an internal error -- we should always have uiData
+            throw new SparkException(
+              s"no stage ui data found for stage: 
${info.stageId}:${info.attemptId}")
+          )
+          StageStatusInfoUi(status, info, ui)
+        }
+      }
+      getStatusInfoUi(ACTIVE, listener.activeStages.values.toSeq) ++
+        getStatusInfoUi(COMPLETE, listener.completedStages) ++
+        getStatusInfoUi(FAILED, listener.failedStages) ++
+        getStatusInfoUi(PENDING, listener.pendingStages.values.toSeq)
+    }
+  }
+
+  private def withStageAttempt[T](
+      stageId: Int,
+      stageAttemptId: Int)
+      (f: StageStatusInfoUi => T): T = {
+    withStage(stageId) { attempts =>
+        val oneAttempt = attempts.find { stage => stage.info.attemptId == 
stageAttemptId }
+        oneAttempt match {
+          case Some(stage) =>
+            f(stage)
+          case None =>
+            val stageAttempts = attempts.map { _.info.attemptId }
+            throw new NotFoundException(s"unknown attempt for stage $stageId.  
" +
+              s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}")
+        }
+    }
+  }
+}
+
+object OneStageResource {
+  def ordering(taskSorting: TaskSorting): Ordering[TaskData] = {
+    val extractor: (TaskData => Long) = td =>
+      taskSorting match {
+        case ID => td.taskId
+        case INCREASING_RUNTIME => 
td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
+        case DECREASING_RUNTIME => 
-td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
+      }
+    Ordering.by(extractor)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to