This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new b2016f9 KYLIN-5023 Support cluster deployMode for Spark App which master is standalone b2016f9 is described below commit b2016f915db454e64b7018047b3df5c707d7946c Author: XiaoxiangYu <x...@apache.org> AuthorDate: Sat Jul 3 15:58:23 2021 +0800 KYLIN-5023 Support cluster deployMode for Spark App which master is standalone --- .../org/apache/kylin/common/KylinConfigBase.java | 3 + .../kylin/job/execution/AbstractExecutable.java | 6 +- .../apache/spark/sql/common/LocalMetadata.scala | 3 - .../kylin/engine/spark/job/NSparkExecutable.java | 16 +- .../kylin/engine/spark/utils/MetaDumpUtil.java | 2 +- .../kylin/engine/spark/utils/RestService.java | 93 ++++++++++++ .../spark/deploy/SparkApplicationClient.scala | 56 +++++++ .../apache/spark/deploy/StandaloneAppClient.scala | 116 ++++++++++++++ .../resources/response/application-detail-246.html | 167 +++++++++++++++++++++ .../resources/response/standalone-master-246.json | 94 ++++++++++++ .../spark/deploy/StandaloneAppClientTest.scala | 34 +++++ 11 files changed, 582 insertions(+), 8 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 0d2d11e..bccda65 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1541,6 +1541,9 @@ public abstract class KylinConfigBase implements Serializable { } + public String getSparkStandaloneMasterWebUI() { + return getOptional("kylin.engine.spark.standalone.master.httpUrl", ""); + } public String getKylinJobJarPath() { final String jobJar = getOptional(KYLIN_ENGINE_MR_JOB_JAR); diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 4c17d67..1f14508 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -699,9 +699,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent { return 1024; } - //Default driver memory base is 1024M - //Adujst driver memory by cuboid number with stratogy[2 -> 20 -> 100] - //If cuboid number is 10, then driver.memory=2*1024=2048M + // Default driver memory base is 1024M + // Adjust driver memory by cuboid number with stratogy[2 -> 20 -> 100] + // If cuboid number is 10, then driver.memory=2*1024=2048M public static Integer computeDriverMemory(Integer cuboidNum) { KylinConfig config = KylinConfig.getInstanceFromEnv(); int[] driverMemoryStrategy = config.getSparkEngineDriverMemoryStrategy(); diff --git a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala index ab6b182..d7c7f5d 100644 --- a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala +++ b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala @@ -18,9 +18,6 @@ package org.apache.spark.sql.common -import java.io.{File, IOException} - -import org.apache.commons.io.FileUtils import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.util.{LocalFileMetadataTestCase, TempMetadataBuilder} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index efbe417..6859e65 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -63,6 +63,7 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataConstants; +import org.apache.spark.deploy.SparkApplicationClient; import org.apache.spark.utils.SparkVersionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +84,7 @@ public class NSparkExecutable extends AbstractExecutable { private static final String APP_JAR_NAME = "__app__.jar"; private volatile boolean isYarnCluster = false; + private volatile boolean isStandaloneCluster = false; protected void setSparkSubmitClassName(String className) { this.setParam(MetadataConstants.P_CLASS_NAME, className); @@ -165,6 +167,9 @@ public class NSparkExecutable extends AbstractExecutable { } } + /** + * Dump metadata from Kylin Metadata and persist to HDFS(getDistMetaUrl) for Spark Application + */ void attachMetadataAndKylinProps(KylinConfig config) throws IOException { // The way of Updating metadata is CopyOnWrite. So it is safe to use Reference in the value. Set<String> dumpList = getMetadataDumpList(config); @@ -272,6 +277,9 @@ public class NSparkExecutable extends AbstractExecutable { CliCommandExecutor exec = new CliCommandExecutor(); exec.execute(cmd, patternedLogger, jobId); + if (isStandaloneCluster) { + SparkApplicationClient.awaitAndCheckAppState(SparkApplicationClient.STANDALONE_CLUSTER(), jobId); + } updateMetaAfterOperation(config); //Add metrics information to execute result for JobMetricsFacade getManager().addJobInfo(getId(), getJobMetricsInfo(config)); @@ -297,6 +305,12 @@ public class NSparkExecutable extends AbstractExecutable { && "cluster".equals(sparkConfigOverride.get(DEPLOY_MODE)) && !(this instanceof NSparkLocalStep)) { this.isYarnCluster = true; } + + if (sparkConfigOverride.get(SPARK_MASTER).toLowerCase(Locale.ROOT).startsWith("spark") + && "cluster".equals(sparkConfigOverride.get(DEPLOY_MODE)) && !(this instanceof NSparkLocalStep)) { + this.isStandaloneCluster = true; + } + if (!sparkConfigOverride.containsKey("spark.driver.memory")) { sparkConfigOverride.put("spark.driver.memory", computeStepDriverMemory() + "m"); } @@ -434,7 +448,7 @@ public class NSparkExecutable extends AbstractExecutable { private ExecuteResult runLocalMode(String appArgs, KylinConfig config) { try { Class<? extends Object> appClz = ClassUtil.forName(getSparkSubmitClassName(), Object.class); - appClz.getMethod("main", String[].class).invoke(null, (Object) new String[] { appArgs }); + appClz.getMethod("main", String[].class).invoke(null, (Object) new String[]{appArgs}); updateMetaAfterOperation(config); //Add metrics information to execute result for JobMetricsFacade diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java index c2ef2c9..9121bdb 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java @@ -80,7 +80,7 @@ public class MetaDumpUtil { } KylinConfig dstConfig = KylinConfig.createKylinConfig(props); - //upload metadata + // upload metadata new ResourceTool().copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig); } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/RestService.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/RestService.java new file mode 100644 index 0000000..71f76cc --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/RestService.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.utils; + +import java.io.IOException; + +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpConnectionParams; +import org.apache.http.params.HttpParams; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RestService { + private static final Logger logger = LoggerFactory.getLogger(RestService.class); + + private int connectionTimeout; + private int readTimeout; + + public RestService(int connectionTimeout, int readTimeout) { + this.connectionTimeout = connectionTimeout; + this.readTimeout = readTimeout; + + } + + public String getRequest(String url) throws IOException { + return getRequest(url, connectionTimeout, readTimeout); + } + + + public String getRequest(String url, int connTimeout, int readTimeout) throws IOException { + HttpGet request = new HttpGet(url); + return execRequest(request, connTimeout, readTimeout); + } + + + private HttpClient getHttpClient(int connectionTimeout, int readTimeout) { + final HttpParams httpParams = new BasicHttpParams(); + HttpConnectionParams.setSoTimeout(httpParams, readTimeout); + HttpConnectionParams.setConnectionTimeout(httpParams, connectionTimeout); + + return new DefaultHttpClient(httpParams); + } + + public String execRequest(HttpRequestBase request, int connectionTimeout, int readTimeout) throws IOException { + HttpClient httpClient = getHttpClient(connectionTimeout, readTimeout); + try { + HttpResponse response = httpClient.execute(request); + String msg = EntityUtils.toString(response.getEntity()); + int code = response.getStatusLine().getStatusCode(); + if (logger.isTraceEnabled()) { + String displayMessage; + if (msg.length() > 500) { + displayMessage = msg.substring(0, 500); + } else { + displayMessage = msg; + } + logger.trace("Send request: {}. And receive response[{}] which length is {}, and content is {}.", code, + request.getRequestLine(), msg.length(), displayMessage); + } + if (code != 200) + throw new IOException("Invalid http response " + code + " when send request: " + + request.getURI().toString() + "\n" + msg); + return msg; + } catch (IOException e) { + logger.error("error when send http request:" + request.getURI().toString(), e); + throw e; + } finally { + request.releaseConnection(); + } + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala new file mode 100644 index 0000000..5b2caf4 --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy + +import org.apache.spark.internal.Logging + +/** + * A client which ask application's state from Yarn's resource manager of Standalone master. + */ +object SparkApplicationClient extends Logging { + + // Copied from org.apache.spark.deploy.master.ApplicationState + val finalStates = Set("FINISHED", "FAILED", "KILLED", "UNKNOWN") + + // master is spark standalone and deployMode is cluster + val STANDALONE_CLUSTER: String = "standalone_cluster" + + /** + * Report the state of an application which name is stepId until it has exited, + * either successfully or due to some failure, then return app state. + * + * Current only for spark job which its master is spark standalone and deployMode is cluster, + * because standalone client lack property "spark.standalone.submit.waitAppCompletion". + */ + def awaitAndCheckAppState(sparkMaster: String, stepId: String): String = { + logInfo(s"AwaitAndCheckAppState $stepId{} ...") + sparkMaster match { + case STANDALONE_CLUSTER => + var appState = StandaloneAppClient.getAppState(stepId) + while (true) { + logInfo(s"$stepId state is $appState .") + if (!finalStates.contains(appState)) { + Thread.sleep(10000) + } + appState = StandaloneAppClient.getAppState(stepId) + } + appState + case m => throw new UnsupportedOperationException("waitAndCheckAppState " + m) + } + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/StandaloneAppClient.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/StandaloneAppClient.scala new file mode 100644 index 0000000..15f456a --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/StandaloneAppClient.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import org.apache.kylin.common.KylinConfig +import org.apache.kylin.engine.spark.utils.RestService +import org.apache.spark.internal.Logging + +import java.io.IOException +import scala.collection.mutable +import scala.util.parsing.json.JSON._ + + +object StandaloneAppClient extends Logging { + + private val JOB_STEP_PREFIX = "job_step_" + + // appId -> (appName, state, starttime) + private val cachedKylinJobMap: mutable.Map[String, (String, String, Long)] = new mutable.LinkedHashMap[String, (String, String, Long)]() + + private var jobInfoUpdateTime = System.currentTimeMillis() + private val cacheTtl = 3600 * 1000 * 24 * 5 + private val cacheMaxSize = 30000 + + // private val masterUrlHtml: String = KylinConfig.getInstanceFromEnv.getSparkStandaloneMasterWebUI + "/app/?appId=" + private val masterUrlJson: String = KylinConfig.getInstanceFromEnv.getSparkStandaloneMasterWebUI + "/json" + + private val restService: RestService = new RestService(10000, 10000) + + + /** + * @see org.apache.spark.deploy.master.ApplicationInfo + * @return Kylin's Build Job 's ApplicationInfo, update every 5 minutes + */ + def getRunningJobs: mutable.Map[String, (String, String, Long)] = cachedKylinJobMap.synchronized { + val currMills = System.currentTimeMillis + if (cachedKylinJobMap.isEmpty || currMills - jobInfoUpdateTime >= 10000) { + logDebug("Updating app status ...") + try { + val realResp = restService.getRequest(masterUrlJson) + parseApplicationState(realResp) + } catch { + case ioe: IOException => logError("Can not connect to standalone master service.", ioe) + case e: Exception => logError("Error .", e) + } + jobInfoUpdateTime = currMills + } + cachedKylinJobMap + } + + def getAppState(stepId: String): String = { + getRunningJobs + + val doNothing: PartialFunction[(String, String, Long), (String, String, Long)] = { + case x => x + } + val res: Iterable[(String, String, Long)] = cachedKylinJobMap.values.filter(app => app._1.contains(stepId)).collect(doNothing) + + res.size match { + case 0 => "SUBMITTED" + case 1 => res.head._2 + case _ => + // find the recent submitted application + res.maxBy(x => x._3)._2 + } + } + + + def parseApplicationState(responseStr: String): Unit = { + val curr = System.currentTimeMillis() + + var respJson = Map.empty[String, Any] + val tree = parseFull(responseStr) + respJson = tree match { + case Some(map: Map[String, Any]) => map + } + val app1 = respJson.getOrElse("completedapps", Array()) + val completedApps = app1.asInstanceOf[List[Map[String, Any]]] + + for (app <- completedApps) { + val name: String = app.getOrElse("name", "").asInstanceOf[String] + val id: String = app.getOrElse("id", "").asInstanceOf[String] + val state: String = app.getOrElse("state", "").asInstanceOf[String] + val startTime: Double = app.getOrElse("starttime", "0").asInstanceOf[Double] + if (name.contains(JOB_STEP_PREFIX)) { + cachedKylinJobMap(id) = (name, state, startTime.toLong) + } + } + + // Clean too old jobs + if (cachedKylinJobMap.size > cacheMaxSize) { + for (id <- cachedKylinJobMap.keys) { + val app = cachedKylinJobMap.get(id) + if (app.isDefined && curr - app.get._3 > cacheTtl) { + cachedKylinJobMap.remove(id) + } + } + } + } +} \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-engine/src/test/resources/response/application-detail-246.html b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/application-detail-246.html new file mode 100644 index 0000000..9371cee --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/application-detail-246.html @@ -0,0 +1,167 @@ +<!DOCTYPE html> +<html> +<head> + <meta http-equiv="Content-type" content="text/html; charset=utf-8"/> + <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css"/> + <link rel="stylesheet" href="/static/vis-timeline-graph2d.min.css" type="text/css"/> + <link rel="stylesheet" href="/static/webui.css" type="text/css"/> + <link rel="stylesheet" href="/static/timeline-view.css" type="text/css"/> + <script src="/static/sorttable.js"></script> + <script src="/static/jquery-1.12.4.min.js"></script> + <script src="/static/vis-timeline-graph2d.min.js"></script> + <script src="/static/bootstrap-tooltip.js"></script> + <script src="/static/initialize-tooltips.js"></script> + <script src="/static/table.js"></script> + <script src="/static/additional-metrics.js"></script> + <script src="/static/timeline-view.js"></script> + <script src="/static/log-view.js"></script> + <script src="/static/webui.js"></script> + <script>setUIRoot('')</script> + + <link rel="shortcut icon" href="/static/spark-logo-77x50px-hd.png"></link> + <title>Application: job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01</title> +</head> +<body> +<div class="container-fluid"> + <div class="row-fluid"> + <div class="span12"> + <h3 style="vertical-align: middle; display: inline-block;"> + <a style="text-decoration: none" href="/"> + <img src="/static/spark-logo-77x50px-hd.png"/> + <span class="version" style="margin-right: 15px;">2.4.6</span> + </a> + Application: job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01 + </h3> + </div> + </div> + <div class="row-fluid"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>ID:</strong> app-20210701234053-0002</li> + <li><strong>Name:</strong> job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01</li> + <li><strong>User:</strong> root</li> + <li><strong>Cores:</strong> + Unlimited (4 granted) + </li> + <li> + <span data-toggle="tooltip" title="Maximum number of executors that this application will use. This limit is finite only when + dynamic allocation is enabled. The number of granted executors may exceed the limit + ephemerally when executors are being killed. + " data-placement="right"> + <strong>Executor Limit: </strong> + Unlimited + (4 granted) + </span> + </li> + <li> + <strong>Executor Memory:</strong> + 1024.0 MB + </li> + <li><strong>Submit Date:</strong> 2021/07/01 23:40:53</li> + <li><strong>State:</strong> FINISHED</li> + + </ul> + </div> + </div> + <div class="row-fluid"> <!-- Executors --> + <div class="span12"> + <span class="collapse-aggregated-executors collapse-table" + onClick="collapseTable('collapse-aggregated-executors','aggregated-executors')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Executor Summary (4)</a> + </h4> + </span> + <div class="aggregated-executors collapsible-table"> + <table class="table table-bordered table-condensed table-striped sortable"> + <thead> + <th width="" class="">ExecutorID</th> + <th width="" class="">Worker</th> + <th width="" class="">Cores</th> + <th width="" class="">Memory</th> + <th width="" class="">State</th> + <th width="" class="">Logs</th> + </thead> + <tbody> + + </tbody> + </table> + </div> + <span class="collapse-aggregated-removedExecutors collapse-table" onClick="collapseTable('collapse-aggregated-removedExecutors', + 'aggregated-removedExecutors')"> + <h4> + <span class="collapse-table-arrow arrow-open"></span> + <a>Removed Executors (4)</a> + </h4> + </span> + <div class="aggregated-removedExecutors collapsible-table"> + <table class="table table-bordered table-condensed table-striped sortable"> + <thead> + <th width="" class="">ExecutorID</th> + <th width="" class="">Worker</th> + <th width="" class="">Cores</th> + <th width="" class="">Memory</th> + <th width="" class="">State</th> + <th width="" class="">Logs</th> + </thead> + <tbody> + <tr> + <td>2</td> + <td> + <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a> + </td> + <td>1</td> + <td>1024</td> + <td>KILLED</td> + <td> + <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=2&logType=stdout">stdout</a> + <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=2&logType=stderr">stderr</a> + </td> + </tr> + <tr> + <td>1</td> + <td> + <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a> + </td> + <td>1</td> + <td>1024</td> + <td>KILLED</td> + <td> + <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=1&logType=stdout">stdout</a> + <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=1&logType=stderr">stderr</a> + </td> + </tr> + <tr> + <td>3</td> + <td> + <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a> + </td> + <td>1</td> + <td>1024</td> + <td>KILLED</td> + <td> + <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=3&logType=stdout">stdout</a> + <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=3&logType=stderr">stderr</a> + </td> + </tr> + <tr> + <td>0</td> + <td> + <a href="http://10.1.3.90:10041">worker-20210630172110-10.1.3.90-10040</a> + </td> + <td>1</td> + <td>1024</td> + <td>KILLED</td> + <td> + <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=0&logType=stdout">stdout</a> + <a href="http://10.1.3.90:10041/logPage?appId=app-20210701234053-0002&executorId=0&logType=stderr">stderr</a> + </td> + </tr> + </tbody> + </table> + </div> + </div> + </div> +</div> +</body> +</html> \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-engine/src/test/resources/response/standalone-master-246.json b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/standalone-master-246.json new file mode 100644 index 0000000..aa02873 --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/test/resources/response/standalone-master-246.json @@ -0,0 +1,94 @@ +{ + "url": "spark://cdh-master:10030", + "workers": [ + { + "id": "worker-20210630172110-10.1.3.90-10040", + "host": "10.1.3.90", + "port": 10040, + "webuiaddress": "http://10.1.3.90:10041", + "cores": 5, + "coresused": 0, + "coresfree": 5, + "memory": 20480, + "memoryused": 0, + "memoryfree": 20480, + "state": "ALIVE", + "lastheartbeat": 1625193717228 + } + ], + "aliveworkers": 1, + "cores": 5, + "coresused": 0, + "memory": 20480, + "memoryused": 0, + "activeapps": [], + "completedapps": [ + { + "id": "app-20210701230444-0000", + "starttime": 1625151884352, + "name": "job_step_6eb0d430-2882-4699-9915-1154959c2cd8-01", + "cores": 4, + "user": "root", + "memoryperslave": 1024, + "submitdate": "Thu Jul 01 23:04:44 CST 2021", + "state": "FINISHED", + "duration": 26068 + }, + { + "id": "app-20210701232026-0001", + "starttime": 1625152826024, + "name": "job_step_6eb0d430-2882-4699-9915-1154959c2cd8-01", + "cores": 4, + "user": "root", + "memoryperslave": 1024, + "submitdate": "Thu Jul 01 23:20:26 CST 2021", + "state": "FINISHED", + "duration": 29656 + }, + { + "id": "app-20210701234053-0002", + "starttime": 1625154053758, + "name": "job_step_93338927-bbbf-4baf-8a22-b44cb701b57b-01", + "cores": 4, + "user": "root", + "memoryperslave": 1024, + "submitdate": "Thu Jul 01 23:40:53 CST 2021", + "state": "FINISHED", + "duration": 27223 + } + ], + "activedrivers": [], + "completeddrivers": [ + { + "id": "driver-20210701230402-0000", + "starttime": "1625151842605", + "state": "FINISHED", + "cores": 1, + "memory": 1024, + "submitdate": "Thu Jul 01 23:04:02 CST 2021", + "worker": "worker-20210630172110-10.1.3.90-10040", + "mainclass": "org.apache.kylin.engine.spark.application.SparkEntry" + }, + { + "id": "driver-20210701232014-0001", + "starttime": "1625152814554", + "state": "FINISHED", + "cores": 1, + "memory": 1024, + "submitdate": "Thu Jul 01 23:20:14 CST 2021", + "worker": "worker-20210630172110-10.1.3.90-10040", + "mainclass": "org.apache.kylin.engine.spark.application.SparkEntry" + }, + { + "id": "driver-20210701233954-0002", + "starttime": "1625153994011", + "state": "FINISHED", + "cores": 1, + "memory": 1024, + "submitdate": "Thu Jul 01 23:39:54 CST 2021", + "worker": "worker-20210630172110-10.1.3.90-10040", + "mainclass": "org.apache.kylin.engine.spark.application.SparkEntry" + } + ], + "status": "ALIVE" +} \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/spark/deploy/StandaloneAppClientTest.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/spark/deploy/StandaloneAppClientTest.scala new file mode 100644 index 0000000..a1fabfe --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/spark/deploy/StandaloneAppClientTest.scala @@ -0,0 +1,34 @@ +package org.apache.spark.deploy + +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.common.{LocalMetadata, SparderBaseFunSuite} + +import java.io.{InputStream, StringWriter} + +class StandaloneAppClientTest extends SparderBaseFunSuite with LocalMetadata { + + test("Test find state function from HTML and JSON response.") { + val htmlFile = getClass.getClassLoader.getResourceAsStream("response/application-detail-246.html") + val jsonFile = getClass.getClassLoader.getResourceAsStream("response/standalone-master-246.json") + + def readF(f: InputStream): String = { + val writer = new StringWriter() + IOUtils.copy(f, writer, "UTF-8") + f.close() + writer.toString + } + + val htmlStr = readF(htmlFile) + val jsonStr = readF(jsonFile) + + + StandaloneAppClient.parseApplicationState(jsonStr) + val res = StandaloneAppClient.getRunningJobs + val state = StandaloneAppClient.getAppState("6eb0d430-2882-4699-9915-1154959c2cd8") + assert("FINISHED".equals(state)) + + assert(res.size == 3) + assert(res.contains("app-20210701232026-0001")) + assert("FINISHED".equals(res.getOrElse("app-20210701232026-0001", ("", "", 1L))._2)) + } +}