[SPARK-3454] separate json endpoints for data in the UI Exposes data available in the UI as json over http. Key points:
* new endpoints, handled independently of existing XyzPage classes. Root entrypoint is `JsonRootResource` * Uses jersey + jackson for routing & converting POJOs into json * tests against known results in `HistoryServerSuite` * also fixes some minor issues w/ the UI -- synchronizing on access to `StorageListener` & `StorageStatusListener`, and fixing some inconsistencies w/ the way we handle retained jobs & stages. Author: Imran Rashid <[email protected]> Closes #4435 from squito/SPARK-3454 and squashes the following commits: da1e35f [Imran Rashid] typos etc. 5e78b4f [Imran Rashid] fix rendering problems 5ae02ad [Imran Rashid] Merge branch 'master' into SPARK-3454 f016182 [Imran Rashid] change all constructors json-pojo class constructors to be private[spark] to protect us from mima-false-positives if we add fields 3347b72 [Imran Rashid] mark EnumUtil as @Private ec140a2 [Imran Rashid] create @Private cc1febf [Imran Rashid] add docs on the metrics-as-json api cbaf287 [Imran Rashid] Merge branch 'master' into SPARK-3454 56db31e [Imran Rashid] update tests for mulit-attempt 7f3bc4e [Imran Rashid] Revert "add sbt-revolved plugin, to make it easier to start & stop http servers in sbt" 67008b4 [Imran Rashid] rats 9e51400 [Imran Rashid] style c9bae1c [Imran Rashid] handle multiple attempts per app b87cd63 [Imran Rashid] add sbt-revolved plugin, to make it easier to start & stop http servers in sbt 188762c [Imran Rashid] multi-attempt 2af11e5 [Imran Rashid] Merge branch 'master' into SPARK-3454 befff0c [Imran Rashid] review feedback 14ac3ed [Imran Rashid] jersey-core needs to be explicit; move version & scope to parent pom.xml f90680e [Imran Rashid] Merge branch 'master' into SPARK-3454 dc8a7fe [Imran Rashid] style, fix errant comments acb7ef6 [Imran Rashid] fix indentation 7bf1811 [Imran Rashid] move MetricHelper so mima doesnt think its exposed; comments 9d889d6 [Imran Rashid] undo some unnecessary changes f48a7b0 [Imran Rashid] docs 52bbae8 [Imran Rashid] StorageListener & StorageStatusListener needs to synchronize internally to be thread-safe 31c79ce [Imran Rashid] asm no longer needed for SPARK_PREPEND_CLASSES b2f8b91 [Imran Rashid] @DeveloperApi 2e19be2 [Imran Rashid] lazily convert ApplicationInfo to avoid memory overhead ba3d9d2 [Imran Rashid] upper case enums 39ac29c [Imran Rashid] move EnumUtil d2bde77 [Imran Rashid] update error handling & scoping 4a234d3 [Imran Rashid] avoid jersey-media-json-jackson b/c of potential version conflicts a157a2f [Imran Rashid] style 7bd4d15 [Imran Rashid] delete security test, since it doesnt do anything a325563 [Imran Rashid] style a9c5cf1 [Imran Rashid] undo changes superceeded by master 0c6f968 [Imran Rashid] update deps 1ed0d07 [Imran Rashid] Merge branch 'master' into SPARK-3454 4c92af6 [Imran Rashid] style f2e63ad [Imran Rashid] Merge branch 'master' into SPARK-3454 c22b11f [Imran Rashid] fix compile error 9ea682c [Imran Rashid] go back to good ol' java enums cf86175 [Imran Rashid] style d493b38 [Imran Rashid] Merge branch 'master' into SPARK-3454 f05ae89 [Imran Rashid] add in ExecutorSummaryInfo for MiMa :( 101a698 [Imran Rashid] style d2ef58d [Imran Rashid] revert changes that had HistoryServer refresh the application listing more often b136e39b [Imran Rashid] Revert "add sbt-revolved plugin, to make it easier to start & stop http servers in sbt" e031719 [Imran Rashid] fixes from review 1f53a66 [Imran Rashid] style b4a7863 [Imran Rashid] fix compile error 2c8b7ee [Imran Rashid] rats 1578a4a [Imran Rashid] doc 674f8dc [Imran Rashid] more explicit about total numbers of jobs & stages vs. number retained 9922be0 [Imran Rashid] Merge branch 'master' into stage_distributions f5a5196 [Imran Rashid] undo removal of renderJson from MasterPage, since there is no substitute yet db61211 [Imran Rashid] get JobProgressListener directly from UI fdfc181 [Imran Rashid] stage/taskList 63eb4a6 [Imran Rashid] tests for taskSummary ad27de8 [Imran Rashid] error handling on quantile values b2efcaf [Imran Rashid] cleanup, combine stage-related paths into one resource aaba896 [Imran Rashid] wire up task summary a4b1397 [Imran Rashid] stage metric distributions e48ba32 [Imran Rashid] rename eaf3bbb [Imran Rashid] style 25cd894 [Imran Rashid] if only given day, assume GMT 51eaedb [Imran Rashid] more visibility fixes 9f28b7e [Imran Rashid] ack, more cleanup 99764e1 [Imran Rashid] Merge branch 'SPARK-3454_w_jersey' into SPARK-3454 a61a43c [Imran Rashid] oops, remove accidental checkin a066055 [Imran Rashid] set visibility on a lot of classes 1f361c8 [Imran Rashid] update rat-excludes 0be5120 [Imran Rashid] Merge branch 'master' into SPARK-3454_w_jersey 2382bef [Imran Rashid] switch to using new "enum" fef6605 [Imran Rashid] some utils for working w/ new "enum" format dbfc7bf [Imran Rashid] style b86bcb0 [Imran Rashid] update test to look at one stage attempt 5f9df24 [Imran Rashid] style 7fd156a [Imran Rashid] refactor jsonDiff to avoid code duplication 73f1378 [Imran Rashid] test json; also add test cases for cleaned stages & jobs 97d411f [Imran Rashid] json endpoint for one job 0c96147 [Imran Rashid] better error msgs for bad stageId vs bad attemptId dddbd29 [Imran Rashid] stages have attempt; jobs are sorted; resource for all attempts for one stage 190c17a [Imran Rashid] StagePage should distinguish no task data, from unknown stage 84cd497 [Imran Rashid] AllJobsPage should still report correct completed & failed job count, even if some have been cleaned, to make it consistent w/ AllStagesPage 36e4062 [Imran Rashid] SparkUI needs to know about startTime, so it can list its own applicationInfo b4c75ed [Imran Rashid] fix merge conflicts; need to widen visibility in a few cases e91750a [Imran Rashid] Merge branch 'master' into SPARK-3454_w_jersey 56d2fc7 [Imran Rashid] jersey needs asm for SPARK_PREPEND_CLASSES to work f7df095 [Imran Rashid] add test for accumulables, and discover that I need update after all 9c0c125 [Imran Rashid] add accumulableInfo 00e9cc5 [Imran Rashid] more style 3377e61 [Imran Rashid] scaladoc d05f7a9 [Imran Rashid] dont use case classes for status api POJOs, since they have binary compatibility issues 654cecf [Imran Rashid] move all the status api POJOs to one file b86e2b0 [Imran Rashid] style 18a8c45 [Imran Rashid] Merge branch 'master' into SPARK-3454_w_jersey 5598f19 [Imran Rashid] delete some unnecessary code, more to go 56edce0 [Imran Rashid] style 017c755 [Imran Rashid] add in metrics now available 1b78cb7 [Imran Rashid] fix some import ordering 0dc3ea7 [Imran Rashid] if app isnt found, reload apps from FS before giving up c7d884f [Imran Rashid] fix merge conflicts 0c12b50 [Imran Rashid] Merge branch 'master' into SPARK-3454_w_jersey b6a96a8 [Imran Rashid] compare json by AST, not string cd37845 [Imran Rashid] switch to using java.util.Dates for times a4ab5aa [Imran Rashid] add in explicit dependency on jersey 1.9 -- maven wasn't happy before this 4fdc39f [Imran Rashid] refactor case insensitive enum parsing cba1ef6 [Imran Rashid] add security (maybe?) for metrics json f0264a7 [Imran Rashid] switch to using jersey for metrics json bceb3a9 [Imran Rashid] set http response code on error, some testing e0356b6 [Imran Rashid] put new test expectation files in rat excludes (is this OK?) b252e7a [Imran Rashid] small cleanup of accidental changes d1a8c92 [Imran Rashid] add sbt-revolved plugin, to make it easier to start & stop http servers in sbt 4b398d0 [Imran Rashid] expose UI data as json in new endpoints Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4973580 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4973580 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4973580 Branch: refs/heads/master Commit: d49735800db27239c11478aac4b0f2ec9df91a3f Parents: 51f4620 Author: Imran Rashid <[email protected]> Authored: Tue May 5 07:25:40 2015 -0500 Committer: Imran Rashid <[email protected]> Committed: Tue May 5 07:25:40 2015 -0500 ---------------------------------------------------------------------- .rat-excludes | 7 + core/pom.xml | 8 + .../org/apache/spark/JobExecutionStatus.java | 8 +- .../spark/status/api/v1/ApplicationStatus.java | 30 + .../apache/spark/status/api/v1/StageStatus.java | 31 + .../apache/spark/status/api/v1/TaskSorting.java | 48 + .../java/org/apache/spark/util/EnumUtil.java | 38 + .../scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/annotation/Private.java | 41 + .../history/ApplicationHistoryProvider.scala | 4 +- .../deploy/history/FsHistoryProvider.scala | 14 +- .../spark/deploy/history/HistoryServer.scala | 20 +- .../spark/deploy/master/ApplicationInfo.scala | 2 +- .../org/apache/spark/deploy/master/Master.scala | 14 +- .../deploy/master/ui/ApplicationPage.scala | 19 +- .../spark/deploy/master/ui/MasterPage.scala | 12 +- .../spark/deploy/master/ui/MasterWebUI.scala | 24 +- .../spark/status/api/v1/AllJobsResource.scala | 98 + .../spark/status/api/v1/AllRDDResource.scala | 104 + .../spark/status/api/v1/AllStagesResource.scala | 309 + .../status/api/v1/ApplicationListResource.scala | 94 + .../status/api/v1/ExecutorListResource.scala | 36 + .../status/api/v1/JacksonMessageWriter.scala | 93 + .../spark/status/api/v1/JsonRootResource.scala | 255 + .../status/api/v1/OneApplicationResource.scala | 31 + .../spark/status/api/v1/OneJobResource.scala | 41 + .../spark/status/api/v1/OneRDDResource.scala | 34 + .../spark/status/api/v1/OneStageResource.scala | 150 + .../spark/status/api/v1/SecurityFilter.scala | 38 + .../spark/status/api/v1/SimpleDateParam.scala | 55 + .../org/apache/spark/status/api/v1/api.scala | 228 + .../spark/storage/StorageStatusListener.scala | 6 +- .../scala/org/apache/spark/ui/SparkUI.scala | 49 +- .../main/scala/org/apache/spark/ui/WebUI.scala | 8 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 17 +- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 14 +- .../apache/spark/ui/jobs/AllStagesPage.scala | 12 +- .../org/apache/spark/ui/jobs/JobPage.scala | 2 +- .../spark/ui/jobs/JobProgressListener.scala | 4 + .../org/apache/spark/ui/jobs/PoolPage.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala | 19 +- .../org/apache/spark/ui/storage/RDDPage.scala | 73 +- .../apache/spark/ui/storage/StoragePage.scala | 2 +- .../apache/spark/ui/storage/StorageTab.scala | 6 +- .../applications/json_expectation | 53 + .../executors/json_expectation | 17 + .../local-1422981780767/jobs/0/json_expectation | 15 + .../local-1422981780767/jobs/json_expectation | 43 + .../json_expectation | 43 + .../jobs?status=succeeded/json_expectation | 29 + .../local-1422981780767/json_expectation | 10 + .../stages/1/0/json_expectation | 270 + .../stages/1/json_expectation | 270 + .../local-1422981780767/stages/json_expectation | 89 + .../stages?status=complete/json_expectation | 67 + .../stages?status=failed/json_expectation | 23 + .../storage/rdd/0/json_expectation | 64 + .../storage/rdd/json_expectation | 9 + .../local-1426533911241/1/jobs/json_expectation | 15 + .../1/stages/0/0/json_expectation | 242 + .../1/stages/0/0/taskList/json_expectation | 193 + .../1/stages/json_expectation | 27 + .../local-1426533911241/2/jobs/json_expectation | 15 + .../2/stages/0/0/taskList/json_expectation | 193 + .../local-1426533911241/json_expectation | 17 + .../stages/20/0/taskList/json_expectation | 481 + .../json_expectation | 1201 ++ .../0/taskList?sortBy=-runtime/json_expectation | 481 + .../json_expectation | 481 + .../0/taskList?sortBy=runtime/json_expectation | 481 + .../stages/20/0/taskSummary/json_expectation | 15 + .../json_expectation | 15 + .../json_expectation | 10 + .../json_expectation | 19 + .../json_expectation | 35 + .../json_expectation | 53 + .../json_expectation | 1 + .../local-1422981759269/APPLICATION_COMPLETE | 0 .../local-1422981759269/EVENT_LOG_1 | 88 + .../local-1422981759269/SPARK_VERSION_1.2.0 | 0 .../local-1422981780767/APPLICATION_COMPLETE | 0 .../local-1422981780767/EVENT_LOG_1 | 82 + .../local-1422981780767/SPARK_VERSION_1.2.0 | 0 .../local-1425081759269/APPLICATION_COMPLETE | 0 .../local-1425081759269/EVENT_LOG_1 | 88 + .../local-1425081759269/SPARK_VERSION_1.2.0 | 0 .../local-1426533911241/APPLICATION_COMPLETE | 0 .../local-1426533911241/EVENT_LOG_1 | 24 + .../local-1426533911241/SPARK_VERSION_1.2.0 | 0 .../local-1426633911242/APPLICATION_COMPLETE | 0 .../local-1426633911242/EVENT_LOG_1 | 24 + .../local-1426633911242/SPARK_VERSION_1.2.0 | 0 .../resources/spark-events/local-1427397477963 | 12083 +++++++++++++++++ .../scala/org/apache/spark/JsonTestUtils.scala | 34 + .../apache/spark/deploy/JsonProtocolSuite.scala | 14 +- .../deploy/history/HistoryServerSuite.scala | 223 +- .../status/api/v1/SimpleDateParamTest.scala | 29 + .../org/apache/spark/ui/UISeleniumSuite.scala | 264 +- docs/monitoring.md | 74 + pom.xml | 12 + 100 files changed, 19946 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/.rat-excludes ---------------------------------------------------------------------- diff --git a/.rat-excludes b/.rat-excludes index dccf2db..ac652ed 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -74,5 +74,12 @@ logs .*scalastyle-output.xml .*dependency-reduced-pom.xml known_translations +json_expectation +local-1422981759269/* +local-1422981780767/* +local-1425081759269/* +local-1426533911241/* +local-1426633911242/* +local-1427397477963/* DESCRIPTION NAMESPACE http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 164a836..fc42f48 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -229,6 +229,14 @@ <version>3.2.10</version> </dependency> <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </dependency> + <dependency> <groupId>org.apache.mesos</groupId> <artifactId>mesos</artifactId> <classifier>${mesos.classifier}</classifier> http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/JobExecutionStatus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/JobExecutionStatus.java b/core/src/main/java/org/apache/spark/JobExecutionStatus.java index 6e16131..0287fb7 100644 --- a/core/src/main/java/org/apache/spark/JobExecutionStatus.java +++ b/core/src/main/java/org/apache/spark/JobExecutionStatus.java @@ -17,9 +17,15 @@ package org.apache.spark; +import org.apache.spark.util.EnumUtil; + public enum JobExecutionStatus { RUNNING, SUCCEEDED, FAILED, - UNKNOWN + UNKNOWN; + + public static JobExecutionStatus fromString(String str) { + return EnumUtil.parseIgnoreCase(JobExecutionStatus.class, str); + } } http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java b/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java new file mode 100644 index 0000000..8c7dcf7 --- /dev/null +++ b/core/src/main/java/org/apache/spark/status/api/v1/ApplicationStatus.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1; + +import org.apache.spark.util.EnumUtil; + +public enum ApplicationStatus { + COMPLETED, + RUNNING; + + public static ApplicationStatus fromString(String str) { + return EnumUtil.parseIgnoreCase(ApplicationStatus.class, str); + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java new file mode 100644 index 0000000..9dbb565 --- /dev/null +++ b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1; + +import org.apache.spark.util.EnumUtil; + +public enum StageStatus { + ACTIVE, + COMPLETE, + FAILED, + PENDING; + + public static StageStatus fromString(String str) { + return EnumUtil.parseIgnoreCase(StageStatus.class, str); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java b/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java new file mode 100644 index 0000000..f19ed01 --- /dev/null +++ b/core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1; + +import org.apache.spark.util.EnumUtil; + +import java.util.HashSet; +import java.util.Set; + +public enum TaskSorting { + ID, + INCREASING_RUNTIME("runtime"), + DECREASING_RUNTIME("-runtime"); + + private final Set<String> alternateNames; + private TaskSorting(String... names) { + alternateNames = new HashSet<String>(); + for (String n: names) { + alternateNames.add(n); + } + } + + public static TaskSorting fromString(String str) { + String lower = str.toLowerCase(); + for (TaskSorting t: values()) { + if (t.alternateNames.contains(lower)) { + return t; + } + } + return EnumUtil.parseIgnoreCase(TaskSorting.class, str); + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/java/org/apache/spark/util/EnumUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/EnumUtil.java b/core/src/main/java/org/apache/spark/util/EnumUtil.java new file mode 100644 index 0000000..c40c7e7 --- /dev/null +++ b/core/src/main/java/org/apache/spark/util/EnumUtil.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util; + +import com.google.common.base.Joiner; +import org.apache.spark.annotation.Private; + +@Private +public class EnumUtil { + public static <E extends Enum<E>> E parseIgnoreCase(Class<E> clz, String str) { + E[] constants = clz.getEnumConstants(); + if (str == null) { + return null; + } + for (E e : constants) { + if (e.name().equalsIgnoreCase(str)) { + return e; + } + } + throw new IllegalArgumentException( + String.format("Illegal type='%s'. Supported type values: %s", + str, Joiner.on(", ").join(constants))); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b98a54b..7ebee99 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -428,7 +428,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, - _env.securityManager,appName)) + _env.securityManager,appName, startTime = startTime)) } else { // For tests, do not enable the UI None http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/annotation/Private.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/annotation/Private.java b/core/src/main/scala/org/apache/spark/annotation/Private.java new file mode 100644 index 0000000..9082fcf --- /dev/null +++ b/core/src/main/scala/org/apache/spark/annotation/Private.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * A class that is considered private to the internals of Spark -- there is a high-likelihood + * they will be changed in future versions of Spark. + * + * This should be used only when the standard Scala / Java means of protecting classes are + * insufficient. In particular, Java has no equivalent of private[spark], so we use this annotation + * in its place. + * + * NOTE: If there exists a Scaladoc comment that immediately precedes this annotation, the first + * line of the comment must be ":: Private ::" with no trailing blank line. This is because + * of the known issue that Scaladoc displays only either the annotation or the comment, whichever + * comes first. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, + ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE}) +public @interface Private {} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 6a5011a..298a820 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.history import org.apache.spark.ui.SparkUI -private[history] case class ApplicationAttemptInfo( +private[spark] case class ApplicationAttemptInfo( attemptId: Option[String], startTime: Long, endTime: Long, @@ -27,7 +27,7 @@ private[history] case class ApplicationAttemptInfo( sparkUser: String, completed: Boolean = false) -private[history] case class ApplicationHistoryInfo( +private[spark] case class ApplicationHistoryInfo( id: String, name: String, attempts: List[ApplicationAttemptInfo]) http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 993763f..45c2be3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,23 +17,21 @@ package org.apache.spark.deploy.history -import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream} +import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream} import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import scala.collection.mutable -import scala.concurrent.duration.Duration -import com.google.common.util.concurrent.ThreadFactoryBuilder - -import com.google.common.util.concurrent.MoreExecutors -import org.apache.hadoop.fs.permission.AccessControlException +import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.permission.AccessControlException + +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} -import org.apache.spark.{Logging, SecurityManager, SparkConf} /** * A class that provides application history from event logs stored in the file system. @@ -151,7 +149,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, - HistoryServer.getAttemptURI(appId, attempt.attemptId)) + HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) // Do not call ui.bind() to avoid creating a new server for each application } http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 754c8e9..50522e6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -25,6 +25,7 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{SignalLogger, Utils} @@ -45,7 +46,7 @@ class HistoryServer( provider: ApplicationHistoryProvider, securityManager: SecurityManager, port: Int) - extends WebUI(securityManager, port, conf) with Logging { + extends WebUI(securityManager, port, conf) with Logging with UIRoot { // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) @@ -56,7 +57,7 @@ class HistoryServer( require(parts.length == 1 || parts.length == 2, s"Invalid app key $key") val ui = provider .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None) - .getOrElse(throw new NoSuchElementException()) + .getOrElse(throw new NoSuchElementException(s"no app with key $key")) attachSparkUI(ui) ui } @@ -113,6 +114,10 @@ class HistoryServer( } } + def getSparkUI(appKey: String): Option[SparkUI] = { + Option(appCache.get(appKey)) + } + initialize() /** @@ -123,6 +128,9 @@ class HistoryServer( */ def initialize() { attachPage(new HistoryPage(this)) + + attachHandler(JsonRootResource.getJsonServlet(this)) + attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) val contextHandler = new ServletContextHandler @@ -160,7 +168,13 @@ class HistoryServer( * * @return List of all known applications. */ - def getApplicationList(): Iterable[ApplicationHistoryInfo] = provider.getListing() + def getApplicationList(): Iterable[ApplicationHistoryInfo] = { + provider.getListing() + } + + def getApplicationInfoList: Iterator[ApplicationInfo] = { + getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) + } /** * Returns the provider configuration to show in the listing page. http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index f59d550..1620e95 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.ApplicationDescription import org.apache.spark.util.Utils -private[deploy] class ApplicationInfo( +private[spark] class ApplicationInfo( val startTime: Long, val id: String, val desc: ApplicationDescription, http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0fac3cd..53e1903 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -754,9 +754,9 @@ private[master] class Master( /** * Rebuild a new SparkUI from the given application's event logs. - * Return whether this is successful. + * Return the UI if successful, else None */ - private def rebuildSparkUI(app: ApplicationInfo): Boolean = { + private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = { val appName = app.desc.name val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found" try { @@ -764,7 +764,7 @@ private[master] class Master( .getOrElse { // Event logging is not enabled for this application app.desc.appUiUrl = notFoundBasePath - return false + return None } val eventLogFilePrefix = EventLoggingListener.getLogPath( @@ -787,7 +787,7 @@ private[master] class Master( val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs) val replayBus = new ReplayListenerBus() val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), - appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}") + appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime) val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS) try { replayBus.replay(logInput, eventLogFile, maybeTruncated) @@ -798,7 +798,7 @@ private[master] class Master( webUi.attachSparkUI(ui) // Application UI is successfully rebuilt, so link the Master UI to it app.desc.appUiUrl = ui.basePath - true + Some(ui) } catch { case fnf: FileNotFoundException => // Event logging is enabled for this application, but no event logs are found @@ -808,7 +808,7 @@ private[master] class Master( msg += " Did you specify the correct logging directory?" msg = URLEncoder.encode(msg, "UTF-8") app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title" - false + None case e: Exception => // Relay exception message to application UI page val title = s"Application history load error (${app.id})" @@ -817,7 +817,7 @@ private[master] class Master( logError(msg, e) msg = URLEncoder.encode(msg, "UTF-8") app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title" - false + None } } http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 273f077..06e265f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -23,10 +23,8 @@ import scala.concurrent.Await import scala.xml.Node import akka.pattern.ask -import org.json4s.JValue -import org.json4s.JsonAST.JNothing -import org.apache.spark.deploy.{ExecutorState, JsonProtocol} +import org.apache.spark.deploy.ExecutorState import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorDesc import org.apache.spark.ui.{UIUtils, WebUIPage} @@ -38,21 +36,6 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") private val timeout = parent.timeout /** Executor details for a particular application */ - override def renderJson(request: HttpServletRequest): JValue = { - val appId = request.getParameter("appId") - val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, timeout) - val app = state.activeApps.find(_.id == appId).getOrElse({ - state.completedApps.find(_.id == appId).getOrElse(null) - }) - if (app == null) { - JNothing - } else { - JsonProtocol.writeApplicationInfo(app) - } - } - - /** Executor details for a particular application */ def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 1f2c3fd..7569276 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -35,10 +35,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { private val master = parent.masterActorRef private val timeout = parent.timeout - override def renderJson(request: HttpServletRequest): JValue = { + def getMasterState: MasterStateResponse = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, timeout) - JsonProtocol.writeMasterState(state) + Await.result(stateFuture, timeout) + } + + override def renderJson(request: HttpServletRequest): JValue = { + JsonProtocol.writeMasterState(getMasterState) } def handleAppKillRequest(request: HttpServletRequest): Unit = { @@ -68,8 +71,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { /** Index view listing applications and executors */ def render(request: HttpServletRequest): Seq[Node] = { - val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] - val state = Await.result(stateFuture, timeout) + val state = getMasterState val workerHeaders = Seq("Worker Id", "Address", "State", "Cores", "Memory") val workers = state.workers.sortBy(_.id) http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index dea0a65..eb26e9f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.master.ui import org.apache.spark.Logging import org.apache.spark.deploy.master.Master +import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, JsonRootResource, UIRoot} import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.RpcUtils @@ -28,12 +29,15 @@ import org.apache.spark.util.RpcUtils */ private[master] class MasterWebUI(val master: Master, requestedPort: Int) - extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging { + extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging + with UIRoot { val masterActorRef = master.self val timeout = RpcUtils.askTimeout(master.conf) val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) + val masterPage = new MasterPage(this) + initialize() /** Initialize all components of the server. */ @@ -43,6 +47,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) attachPage(new HistoryNotFoundPage(this)) attachPage(masterPage) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) + attachHandler(JsonRootResource.getJsonServlet(this)) attachHandler(createRedirectHandler( "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST"))) attachHandler(createRedirectHandler( @@ -60,6 +65,23 @@ class MasterWebUI(val master: Master, requestedPort: Int) assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") ui.getHandlers.foreach(detachHandler) } + + def getApplicationInfoList: Iterator[ApplicationInfo] = { + val state = masterPage.getMasterState + val activeApps = state.activeApps.sortBy(_.startTime).reverse + val completedApps = state.completedApps.sortBy(_.endTime).reverse + activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++ + completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) } + } + + def getSparkUI(appId: String): Option[SparkUI] = { + val state = masterPage.getMasterState + val activeApps = state.activeApps.sortBy(_.startTime).reverse + val completedApps = state.completedApps.sortBy(_.endTime).reverse + (activeApps ++ completedApps).find { _.id == appId }.flatMap { + master.rebuildSparkUI + } + } } private[master] object MasterWebUI { http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala new file mode 100644 index 0000000..5783df5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import java.util.{Arrays, Date, List => JList} +import javax.ws.rs._ +import javax.ws.rs.core.MediaType + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.jobs.JobProgressListener +import org.apache.spark.ui.jobs.UIData.JobUIData + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class AllJobsResource(ui: SparkUI) { + + @GET + def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = { + val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] = + AllJobsResource.getStatusToJobs(ui) + val adjStatuses: JList[JobExecutionStatus] = { + if (statuses.isEmpty) { + Arrays.asList(JobExecutionStatus.values(): _*) + } else { + statuses + } + } + val jobInfos = for { + (status, jobs) <- statusToJobs + job <- jobs if adjStatuses.contains(status) + } yield { + AllJobsResource.convertJobData(job, ui.jobProgressListener, false) + } + jobInfos.sortBy{- _.jobId} + } + +} + +private[v1] object AllJobsResource { + + def getStatusToJobs(ui: SparkUI): Seq[(JobExecutionStatus, Seq[JobUIData])] = { + val statusToJobs = ui.jobProgressListener.synchronized { + Seq( + JobExecutionStatus.RUNNING -> ui.jobProgressListener.activeJobs.values.toSeq, + JobExecutionStatus.SUCCEEDED -> ui.jobProgressListener.completedJobs.toSeq, + JobExecutionStatus.FAILED -> ui.jobProgressListener.failedJobs.reverse.toSeq + ) + } + statusToJobs + } + + def convertJobData( + job: JobUIData, + listener: JobProgressListener, + includeStageDetails: Boolean): JobData = { + listener.synchronized { + val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max) + val lastStageData = lastStageInfo.flatMap { s => + listener.stageIdToData.get((s.stageId, s.attemptId)) + } + val lastStageName = lastStageInfo.map { _.name }.getOrElse("(Unknown Stage Name)") + val lastStageDescription = lastStageData.flatMap { _.description } + new JobData( + jobId = job.jobId, + name = lastStageName, + description = lastStageDescription, + submissionTime = job.submissionTime.map{new Date(_)}, + completionTime = job.completionTime.map{new Date(_)}, + stageIds = job.stageIds, + jobGroup = job.jobGroup, + status = job.status, + numTasks = job.numTasks, + numActiveTasks = job.numActiveTasks, + numCompletedTasks = job.numCompletedTasks, + numSkippedTasks = job.numCompletedTasks, + numFailedTasks = job.numFailedTasks, + numActiveStages = job.numActiveStages, + numCompletedStages = job.completedStageIndices.size, + numSkippedStages = job.numSkippedStages, + numFailedStages = job.numFailedStages + ) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala new file mode 100644 index 0000000..645ede2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import javax.ws.rs.{GET, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils} +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.storage.StorageListener + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class AllRDDResource(ui: SparkUI) { + + @GET + def rddList(): Seq[RDDStorageInfo] = { + val storageStatusList = ui.storageListener.storageStatusList + val rddInfos = ui.storageListener.rddInfoList + rddInfos.map{rddInfo => + AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList, + includeDetails = false) + } + } + +} + +private[spark] object AllRDDResource { + + def getRDDStorageInfo( + rddId: Int, + listener: StorageListener, + includeDetails: Boolean): Option[RDDStorageInfo] = { + val storageStatusList = listener.storageStatusList + listener.rddInfoList.find { _.id == rddId }.map { rddInfo => + getRDDStorageInfo(rddId, rddInfo, storageStatusList, includeDetails) + } + } + + def getRDDStorageInfo( + rddId: Int, + rddInfo: RDDInfo, + storageStatusList: Seq[StorageStatus], + includeDetails: Boolean): RDDStorageInfo = { + val workers = storageStatusList.map { (rddId, _) } + val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) + val blocks = storageStatusList + .flatMap { _.rddBlocksById(rddId) } + .sortWith { _._1.name < _._1.name } + .map { case (blockId, status) => + (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) + } + + val dataDistribution = if (includeDetails) { + Some(storageStatusList.map { status => + new RDDDataDistribution( + address = status.blockManagerId.hostPort, + memoryUsed = status.memUsedByRdd(rddId), + memoryRemaining = status.memRemaining, + diskUsed = status.diskUsedByRdd(rddId) + ) } ) + } else { + None + } + val partitions = if (includeDetails) { + Some(blocks.map { case (id, block, locations) => + new RDDPartitionInfo( + blockName = id.name, + storageLevel = block.storageLevel.description, + memoryUsed = block.memSize, + diskUsed = block.diskSize, + executors = locations + ) + } ) + } else { + None + } + + new RDDStorageInfo( + id = rddId, + name = rddInfo.name, + numPartitions = rddInfo.numPartitions, + numCachedPartitions = rddInfo.numCachedPartitions, + storageLevel = rddInfo.storageLevel.description, + memoryUsed = rddInfo.memSize, + diskUsed = rddInfo.diskSize, + dataDistribution = dataDistribution, + partitions = partitions + ) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala new file mode 100644 index 0000000..5060858 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import java.util.{Arrays, Date, List => JList} +import javax.ws.rs.{GET, PathParam, Produces, QueryParam} +import javax.ws.rs.core.MediaType + +import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics} +import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo} +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData} +import org.apache.spark.util.Distribution + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class AllStagesResource(ui: SparkUI) { + + @GET + def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = { + val listener = ui.jobProgressListener + val stageAndStatus = AllStagesResource.stagesAndStatus(ui) + val adjStatuses = { + if (statuses.isEmpty()) { + Arrays.asList(StageStatus.values(): _*) + } else { + statuses + } + } + for { + (status, stageList) <- stageAndStatus + stageInfo: StageInfo <- stageList if adjStatuses.contains(status) + stageUiData: StageUIData <- listener.synchronized { + listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)) + } + } yield { + AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false) + } + } +} + +private[v1] object AllStagesResource { + def stageUiToStageData( + status: StageStatus, + stageInfo: StageInfo, + stageUiData: StageUIData, + includeDetails: Boolean): StageData = { + + val taskData = if (includeDetails) { + Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) + } else { + None + } + val executorSummary = if (includeDetails) { + Some(stageUiData.executorSummary.map { case (k, summary) => + k -> new ExecutorStageSummary( + taskTime = summary.taskTime, + failedTasks = summary.failedTasks, + succeededTasks = summary.succeededTasks, + inputBytes = summary.inputBytes, + outputBytes = summary.outputBytes, + shuffleRead = summary.shuffleRead, + shuffleWrite = summary.shuffleWrite, + memoryBytesSpilled = summary.memoryBytesSpilled, + diskBytesSpilled = summary.diskBytesSpilled + ) + }) + } else { + None + } + + val accumulableInfo = stageUiData.accumulables.values.map { convertAccumulableInfo }.toSeq + + new StageData( + status = status, + stageId = stageInfo.stageId, + attemptId = stageInfo.attemptId, + numActiveTasks = stageUiData.numActiveTasks, + numCompleteTasks = stageUiData.numCompleteTasks, + numFailedTasks = stageUiData.numFailedTasks, + executorRunTime = stageUiData.executorRunTime, + inputBytes = stageUiData.inputBytes, + inputRecords = stageUiData.inputRecords, + outputBytes = stageUiData.outputBytes, + outputRecords = stageUiData.outputRecords, + shuffleReadBytes = stageUiData.shuffleReadTotalBytes, + shuffleReadRecords = stageUiData.shuffleReadRecords, + shuffleWriteBytes = stageUiData.shuffleWriteBytes, + shuffleWriteRecords = stageUiData.shuffleWriteRecords, + memoryBytesSpilled = stageUiData.memoryBytesSpilled, + diskBytesSpilled = stageUiData.diskBytesSpilled, + schedulingPool = stageUiData.schedulingPool, + name = stageInfo.name, + details = stageInfo.details, + accumulatorUpdates = accumulableInfo, + tasks = taskData, + executorSummary = executorSummary + ) + } + + def stagesAndStatus(ui: SparkUI): Seq[(StageStatus, Seq[StageInfo])] = { + val listener = ui.jobProgressListener + listener.synchronized { + Seq( + StageStatus.ACTIVE -> listener.activeStages.values.toSeq, + StageStatus.COMPLETE -> listener.completedStages.reverse.toSeq, + StageStatus.FAILED -> listener.failedStages.reverse.toSeq, + StageStatus.PENDING -> listener.pendingStages.values.toSeq + ) + } + } + + def convertTaskData(uiData: TaskUIData): TaskData = { + new TaskData( + taskId = uiData.taskInfo.taskId, + index = uiData.taskInfo.index, + attempt = uiData.taskInfo.attempt, + launchTime = new Date(uiData.taskInfo.launchTime), + executorId = uiData.taskInfo.executorId, + host = uiData.taskInfo.host, + taskLocality = uiData.taskInfo.taskLocality.toString(), + speculative = uiData.taskInfo.speculative, + accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo }, + errorMessage = uiData.errorMessage, + taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics } + ) + } + + def taskMetricDistributions( + allTaskData: Iterable[TaskUIData], + quantiles: Array[Double]): TaskMetricDistributions = { + + val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq + + def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] = + Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) + + // We need to do a lot of similar munging to nested metrics here. For each one, + // we want (a) extract the values for nested metrics (b) make a distribution for each metric + // (c) shove the distribution into the right field in our return type and (d) only return + // a result if the option is defined for any of the tasks. MetricHelper is a little util + // to make it a little easier to deal w/ all of the nested options. Mostly it lets us just + // implement one "build" method, which just builds the quantiles for each field. + + val inputMetrics: Option[InputMetricDistributions] = + new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw: InternalTaskMetrics): Option[InternalInputMetrics] = { + raw.inputMetrics + } + + def build: InputMetricDistributions = new InputMetricDistributions( + bytesRead = submetricQuantiles(_.bytesRead), + recordsRead = submetricQuantiles(_.recordsRead) + ) + }.metricOption + + val outputMetrics: Option[OutputMetricDistributions] = + new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) { + def getSubmetrics(raw:InternalTaskMetrics): Option[InternalOutputMetrics] = { + raw.outputMetrics + } + def build: OutputMetricDistributions = new OutputMetricDistributions( + bytesWritten = submetricQuantiles(_.bytesWritten), + recordsWritten = submetricQuantiles(_.recordsWritten) + ) + }.metricOption + + val shuffleReadMetrics: Option[ShuffleReadMetricDistributions] = + new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleReadMetrics] = { + raw.shuffleReadMetrics + } + def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions( + readBytes = submetricQuantiles(_.totalBytesRead), + readRecords = submetricQuantiles(_.recordsRead), + remoteBytesRead = submetricQuantiles(_.remoteBytesRead), + remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), + localBlocksFetched = submetricQuantiles(_.localBlocksFetched), + totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched), + fetchWaitTime = submetricQuantiles(_.fetchWaitTime) + ) + }.metricOption + + val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions] = + new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics, + quantiles) { + def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleWriteMetrics] = { + raw.shuffleWriteMetrics + } + def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions( + writeBytes = submetricQuantiles(_.shuffleBytesWritten), + writeRecords = submetricQuantiles(_.shuffleRecordsWritten), + writeTime = submetricQuantiles(_.shuffleWriteTime) + ) + }.metricOption + + new TaskMetricDistributions( + quantiles = quantiles, + executorDeserializeTime = metricQuantiles(_.executorDeserializeTime), + executorRunTime = metricQuantiles(_.executorRunTime), + resultSize = metricQuantiles(_.resultSize), + jvmGcTime = metricQuantiles(_.jvmGCTime), + resultSerializationTime = metricQuantiles(_.resultSerializationTime), + memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled), + diskBytesSpilled = metricQuantiles(_.diskBytesSpilled), + inputMetrics = inputMetrics, + outputMetrics = outputMetrics, + shuffleReadMetrics = shuffleReadMetrics, + shuffleWriteMetrics = shuffleWriteMetrics + ) + } + + def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = { + new AccumulableInfo(acc.id, acc.name, acc.update, acc.value) + } + + def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = { + new TaskMetrics( + executorDeserializeTime = internal.executorDeserializeTime, + executorRunTime = internal.executorRunTime, + resultSize = internal.resultSize, + jvmGcTime = internal.jvmGCTime, + resultSerializationTime = internal.resultSerializationTime, + memoryBytesSpilled = internal.memoryBytesSpilled, + diskBytesSpilled = internal.diskBytesSpilled, + inputMetrics = internal.inputMetrics.map { convertInputMetrics }, + outputMetrics = Option(internal.outputMetrics).flatten.map { convertOutputMetrics }, + shuffleReadMetrics = internal.shuffleReadMetrics.map { convertShuffleReadMetrics }, + shuffleWriteMetrics = internal.shuffleWriteMetrics.map { convertShuffleWriteMetrics } + ) + } + + def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = { + new InputMetrics( + bytesRead = internal.bytesRead, + recordsRead = internal.recordsRead + ) + } + + def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = { + new OutputMetrics( + bytesWritten = internal.bytesWritten, + recordsWritten = internal.recordsWritten + ) + } + + def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = { + new ShuffleReadMetrics( + remoteBlocksFetched = internal.remoteBlocksFetched, + localBlocksFetched = internal.localBlocksFetched, + fetchWaitTime = internal.fetchWaitTime, + remoteBytesRead = internal.remoteBytesRead, + totalBlocksFetched = internal.totalBlocksFetched, + recordsRead = internal.recordsRead + ) + } + + def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = { + new ShuffleWriteMetrics( + bytesWritten = internal.shuffleBytesWritten, + writeTime = internal.shuffleWriteTime, + recordsWritten = internal.shuffleRecordsWritten + ) + } +} + +/** + * Helper for getting distributions from nested metric types. Many of the metrics we want are + * contained in options inside TaskMetrics (eg., ShuffleWriteMetrics). This makes it easy to handle + * the options (returning None if the metrics are all empty), and extract the quantiles for each + * metric. After creating an instance, call metricOption to get the result type. + */ +private[v1] abstract class MetricHelper[I,O]( + rawMetrics: Seq[InternalTaskMetrics], + quantiles: Array[Double]) { + + def getSubmetrics(raw: InternalTaskMetrics): Option[I] + + def build: O + + val data: Seq[I] = rawMetrics.flatMap(getSubmetrics) + + /** applies the given function to all input metrics, and returns the quantiles */ + def submetricQuantiles(f: I => Double): IndexedSeq[Double] = { + Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles) + } + + def metricOption: Option[O] = { + if (data.isEmpty) { + None + } else { + Some(build) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala new file mode 100644 index 0000000..17b521f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import java.util.{Arrays, Date, List => JList} +import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam} +import javax.ws.rs.core.MediaType + +import org.apache.spark.deploy.history.ApplicationHistoryInfo +import org.apache.spark.deploy.master.{ApplicationInfo => InternalApplicationInfo} + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class ApplicationListResource(uiRoot: UIRoot) { + + @GET + def appList( + @QueryParam("status") status: JList[ApplicationStatus], + @DefaultValue("2010-01-01") @QueryParam("minDate") minDate: SimpleDateParam, + @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam) + : Iterator[ApplicationInfo] = { + val allApps = uiRoot.getApplicationInfoList + val adjStatus = { + if (status.isEmpty) { + Arrays.asList(ApplicationStatus.values(): _*) + } else { + status + } + } + val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED) + val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING) + allApps.filter { app => + val anyRunning = app.attempts.exists(!_.completed) + // if any attempt is still running, we consider the app to also still be running + val statusOk = (!anyRunning && includeCompleted) || + (anyRunning && includeRunning) + // keep the app if *any* attempts fall in the right time window + val dateOk = app.attempts.exists { attempt => + attempt.startTime.getTime >= minDate.timestamp && + attempt.startTime.getTime <= maxDate.timestamp + } + statusOk && dateOk + } + } +} + +private[spark] object ApplicationsListResource { + def appHistoryInfoToPublicAppInfo(app: ApplicationHistoryInfo): ApplicationInfo = { + new ApplicationInfo( + id = app.id, + name = app.name, + attempts = app.attempts.map { internalAttemptInfo => + new ApplicationAttemptInfo( + attemptId = internalAttemptInfo.attemptId, + startTime = new Date(internalAttemptInfo.startTime), + endTime = new Date(internalAttemptInfo.endTime), + sparkUser = internalAttemptInfo.sparkUser, + completed = internalAttemptInfo.completed + ) + } + ) + } + + def convertApplicationInfo( + internal: InternalApplicationInfo, + completed: Boolean): ApplicationInfo = { + // standalone application info always has just one attempt + new ApplicationInfo( + id = internal.id, + name = internal.desc.name, + attempts = Seq(new ApplicationAttemptInfo( + attemptId = None, + startTime = new Date(internal.startTime), + endTime = new Date(internal.endTime), + sparkUser = internal.desc.user, + completed = completed + )) + ) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala new file mode 100644 index 0000000..8ad4656 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala @@ -0,0 +1,36 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.spark.status.api.v1 + +import javax.ws.rs.{GET, PathParam, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.exec.ExecutorsPage + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class ExecutorListResource(ui: SparkUI) { + + @GET + def executorList(): Seq[ExecutorSummary] = { + val listener = ui.executorsListener + val storageStatusList = listener.storageStatusList + (0 until storageStatusList.size).map { statusId => + ExecutorsPage.getExecInfo(listener, statusId) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala new file mode 100644 index 0000000..202a519 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import java.io.OutputStream +import java.lang.annotation.Annotation +import java.lang.reflect.Type +import java.text.SimpleDateFormat +import java.util.{Calendar, SimpleTimeZone} +import javax.ws.rs.Produces +import javax.ws.rs.core.{MediaType, MultivaluedMap} +import javax.ws.rs.ext.{MessageBodyWriter, Provider} + +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} + +/** + * This class converts the POJO metric responses into json, using jackson. + * + * This doesn't follow the standard jersey-jackson plugin options, because we want to stick + * with an old version of jersey (since we have it from yarn anyway) and don't want to pull in lots + * of dependencies from a new plugin. + * + * Note that jersey automatically discovers this class based on its package and its annotations. + */ +@Provider +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{ + + val mapper = new ObjectMapper() { + override def writeValueAsString(t: Any): String = { + super.writeValueAsString(t) + } + } + mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule) + mapper.enable(SerializationFeature.INDENT_OUTPUT) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat) + + override def isWriteable( + aClass: Class[_], + `type`: Type, + annotations: Array[Annotation], + mediaType: MediaType): Boolean = { + true + } + + override def writeTo( + t: Object, + aClass: Class[_], + `type`: Type, + annotations: Array[Annotation], + mediaType: MediaType, + multivaluedMap: MultivaluedMap[String, AnyRef], + outputStream: OutputStream): Unit = { + t match { + case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8")) + case _ => mapper.writeValue(outputStream, t) + } + } + + override def getSize( + t: Object, + aClass: Class[_], + `type`: Type, + annotations: Array[Annotation], + mediaType: MediaType): Long = { + -1L + } +} + +private[spark] object JacksonMessageWriter { + def makeISODateFormat: SimpleDateFormat = { + val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'") + val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT")) + iso8601.setCalendar(cal) + iso8601 + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala new file mode 100644 index 0000000..c3ec45f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import javax.servlet.ServletContext +import javax.ws.rs._ +import javax.ws.rs.core.{Context, Response} + +import com.sun.jersey.api.core.ResourceConfig +import com.sun.jersey.spi.container.servlet.ServletContainer +import org.eclipse.jetty.server.handler.ContextHandler +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} + +import org.apache.spark.SecurityManager +import org.apache.spark.ui.SparkUI + +/** + * Main entry point for serving spark application metrics as json, using JAX-RS. + * + * Each resource should have endpoints that return **public** classes defined in api.scala. Mima + * binary compatibility checks ensure that we don't inadvertently make changes that break the api. + * The returned objects are automatically converted to json by jackson with JacksonMessageWriter. + * In addition, there are a number of tests in HistoryServerSuite that compare the json to "golden + * files". Any changes and additions should be reflected there as well -- see the notes in + * HistoryServerSuite. + */ +@Path("/v1") +private[v1] class JsonRootResource extends UIRootFromServletContext { + + @Path("applications") + def getApplicationList(): ApplicationListResource = { + new ApplicationListResource(uiRoot) + } + + @Path("applications/{appId}") + def getApplication(): OneApplicationResource = { + new OneApplicationResource(uiRoot) + } + + @Path("applications/{appId}/{attemptId}/jobs") + def getJobs( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): AllJobsResource = { + uiRoot.withSparkUI(appId, Some(attemptId)) { ui => + new AllJobsResource(ui) + } + } + + @Path("applications/{appId}/jobs") + def getJobs(@PathParam("appId") appId: String): AllJobsResource = { + uiRoot.withSparkUI(appId, None) { ui => + new AllJobsResource(ui) + } + } + + @Path("applications/{appId}/jobs/{jobId: \\d+}") + def getJob(@PathParam("appId") appId: String): OneJobResource = { + uiRoot.withSparkUI(appId, None) { ui => + new OneJobResource(ui) + } + } + + @Path("applications/{appId}/{attemptId}/jobs/{jobId: \\d+}") + def getJob( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): OneJobResource = { + uiRoot.withSparkUI(appId, Some(attemptId)) { ui => + new OneJobResource(ui) + } + } + + @Path("applications/{appId}/executors") + def getExecutors(@PathParam("appId") appId: String): ExecutorListResource = { + uiRoot.withSparkUI(appId, None) { ui => + new ExecutorListResource(ui) + } + } + + @Path("applications/{appId}/{attemptId}/executors") + def getExecutors( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): ExecutorListResource = { + uiRoot.withSparkUI(appId, Some(attemptId)) { ui => + new ExecutorListResource(ui) + } + } + + + @Path("applications/{appId}/stages") + def getStages(@PathParam("appId") appId: String): AllStagesResource= { + uiRoot.withSparkUI(appId, None) { ui => + new AllStagesResource(ui) + } + } + + @Path("applications/{appId}/{attemptId}/stages") + def getStages( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): AllStagesResource= { + uiRoot.withSparkUI(appId, Some(attemptId)) { ui => + new AllStagesResource(ui) + } + } + + @Path("applications/{appId}/stages/{stageId: \\d+}") + def getStage(@PathParam("appId") appId: String): OneStageResource= { + uiRoot.withSparkUI(appId, None) { ui => + new OneStageResource(ui) + } + } + + @Path("applications/{appId}/{attemptId}/stages/{stageId: \\d+}") + def getStage( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): OneStageResource = { + uiRoot.withSparkUI(appId, Some(attemptId)) { ui => + new OneStageResource(ui) + } + } + + @Path("applications/{appId}/storage/rdd") + def getRdds(@PathParam("appId") appId: String): AllRDDResource = { + uiRoot.withSparkUI(appId, None) { ui => + new AllRDDResource(ui) + } + } + + @Path("applications/{appId}/{attemptId}/storage/rdd") + def getRdds( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): AllRDDResource = { + uiRoot.withSparkUI(appId, Some(attemptId)) { ui => + new AllRDDResource(ui) + } + } + + @Path("applications/{appId}/storage/rdd/{rddId: \\d+}") + def getRdd(@PathParam("appId") appId: String): OneRDDResource = { + uiRoot.withSparkUI(appId, None) { ui => + new OneRDDResource(ui) + } + } + + @Path("applications/{appId}/{attemptId}/storage/rdd/{rddId: \\d+}") + def getRdd( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): OneRDDResource = { + uiRoot.withSparkUI(appId, Some(attemptId)) { ui => + new OneRDDResource(ui) + } + } + +} + +private[spark] object JsonRootResource { + + def getJsonServlet(uiRoot: UIRoot): ServletContextHandler = { + val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS) + jerseyContext.setContextPath("/json") + val holder:ServletHolder = new ServletHolder(classOf[ServletContainer]) + holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass", + "com.sun.jersey.api.core.PackagesResourceConfig") + holder.setInitParameter("com.sun.jersey.config.property.packages", + "org.apache.spark.status.api.v1") + holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS, + classOf[SecurityFilter].getCanonicalName) + UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot) + jerseyContext.addServlet(holder, "/*") + jerseyContext + } +} + +/** + * This trait is shared by the all the root containers for application UI information -- + * the HistoryServer, the Master UI, and the application UI. This provides the common + * interface needed for them all to expose application info as json. + */ +private[spark] trait UIRoot { + def getSparkUI(appKey: String): Option[SparkUI] + def getApplicationInfoList: Iterator[ApplicationInfo] + + /** + * Get the spark UI with the given appID, and apply a function + * to it. If there is no such app, throw an appropriate exception + */ + def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = { + val appKey = attemptId.map(appId + "/" + _).getOrElse(appId) + getSparkUI(appKey) match { + case Some(ui) => + f(ui) + case None => throw new NotFoundException("no such app: " + appId) + } + } + def securityManager: SecurityManager +} + +private[v1] object UIRootFromServletContext { + + private val attribute = getClass.getCanonicalName + + def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot): Unit = { + contextHandler.setAttribute(attribute, uiRoot) + } + + def getUiRoot(context: ServletContext): UIRoot = { + context.getAttribute(attribute).asInstanceOf[UIRoot] + } +} + +private[v1] trait UIRootFromServletContext { + @Context + var servletContext: ServletContext = _ + + def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext) +} + +private[v1] class NotFoundException(msg: String) extends WebApplicationException( + new NoSuchElementException(msg), + Response + .status(Response.Status.NOT_FOUND) + .entity(ErrorWrapper(msg)) + .build() +) + +private[v1] class BadParameterException(msg: String) extends WebApplicationException( + new IllegalArgumentException(msg), + Response + .status(Response.Status.BAD_REQUEST) + .entity(ErrorWrapper(msg)) + .build() +) { + def this(param: String, exp: String, actual: String) = { + this(raw"""Bad value for parameter "$param". Expected a $exp, got "$actual"""") + } +} + +/** + * Signal to JacksonMessageWriter to not convert the message into json (which would result in an + * extra set of quotes). + */ +private[v1] case class ErrorWrapper(s: String) http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala new file mode 100644 index 0000000..b5ef726 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import javax.ws.rs.core.MediaType +import javax.ws.rs.{Produces, PathParam, GET} + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class OneApplicationResource(uiRoot: UIRoot) { + + @GET + def getApp(@PathParam("appId") appId: String): ApplicationInfo = { + val apps = uiRoot.getApplicationInfoList.find { _.id == appId } + apps.getOrElse(throw new NotFoundException("unknown app: " + appId)) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/d4973580/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala new file mode 100644 index 0000000..6d8a60d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import javax.ws.rs.{PathParam, GET, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.jobs.UIData.JobUIData + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class OneJobResource(ui: SparkUI) { + + @GET + def oneJob(@PathParam("jobId") jobId: Int): JobData = { + val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] = + AllJobsResource.getStatusToJobs(ui) + val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId} + jobOpt.map { job => + AllJobsResource.convertJobData(job, ui.jobProgressListener, false) + }.getOrElse { + throw new NotFoundException("unknown job: " + jobId) + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
