This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new d8974e8525 KYLIN-5217 Create a initial commit
d8974e8525 is described below
commit d8974e85252f7dd6732293beb6387d277df56057
Author: longfei.jiang <[email protected]>
AuthorDate: Tue Aug 9 14:22:39 2022 +0800
KYLIN-5217 Create a initial commit
---
.../kylin/cluster/ClusterManagerFactory.scala | 37 ++++
.../apache/kylin/cluster/K8sClusterManager.scala | 65 +++++++
.../kylin/cluster/SchedulerInfoCmdHelper.scala | 167 +++++++++++++++++
.../kylin/cluster/StandaloneClusterManager.scala | 22 +++
.../apache/kylin/cluster/YarnClusterManager.scala | 206 +++++++++++++++++++++
.../cluster/parser/CapacitySchedulerParser.scala | 92 +++++++++
.../kylin/cluster/parser/FairSchedulerParser.scala | 64 +++++++
.../kylin/cluster/parser/SchedulerParser.scala | 43 +++++
.../cluster/parser/SchedulerParserFactory.scala | 43 +++++
.../engine/spark/scheduler/ClusterMonitor.scala | 82 ++++++++
.../kylin/engine/spark/scheduler/JobRuntime.scala | 59 ++++++
.../engine/spark/scheduler/KylinJobEvent.scala | 44 +++++
.../engine/spark/scheduler/KylinJobListener.scala | 23 +++
.../org/apache/spark/application/JobMonitor.scala | 94 ++++++++++
.../apache/spark/application/JobWorkSpace.scala | 153 +++++++++++++++
.../org/apache/spark/application/JobWorker.scala | 71 +++++++
.../deploy/master/StandaloneClusterManager.scala | 157 ++++++++++++++++
.../apache/spark/scheduler/KylinJobEventLoop.scala | 48 +++++
18 files changed, 1470 insertions(+)
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
new file mode 100644
index 0000000000..6009232f57
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster
+
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import io.kyligence.kap.guava20.shaded.common.util.concurrent.SimpleTimeLimiter
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.util.KylinReflectUtils
+
+
+object ClusterManagerFactory {
+
+ val pool: ExecutorService = Executors.newCachedThreadPool
+
+ def create(kylinConfig: KylinConfig): IClusterManager = {
+ val timeLimiter = SimpleTimeLimiter.create(pool)
+ val target =
KylinReflectUtils.createObject(kylinConfig.getClusterManagerClassName)._1.asInstanceOf[IClusterManager]
+
+ timeLimiter.newProxy(target, classOf[IClusterManager],
kylinConfig.getClusterManagerTimeoutThreshold, TimeUnit.SECONDS);
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
new file mode 100644
index 0000000000..8711622885
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.cluster
+
+import java.util
+
+import com.google.common.collect.Lists
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+class K8sClusterManager extends IClusterManager with Logging {
+ override def fetchMaximumResourceAllocation: ResourceInfo = {
+ ResourceInfo(Int.MaxValue, 1000)
+ }
+
+ override def fetchQueueAvailableResource(queueName: String):
AvailableResource = {
+ AvailableResource(ResourceInfo(Int.MaxValue, 1000),
ResourceInfo(Int.MaxValue, 1000))
+ }
+
+ override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
+ logInfo("Get Build Tracking Url!")
+ val applicationId = sparkSession.sparkContext.applicationId
+ ""
+ }
+
+ override def killApplication(jobStepId: String): Unit = {
+ logInfo("Kill Application $jobStepId !")
+ }
+
+ override def killApplication(jobStepPrefix: String, jobStepId: String): Unit
= {
+ logInfo("Kill Application $jobStepPrefix $jobStepId !")
+ }
+
+ override def isApplicationBeenKilled(applicationId: String): Boolean = {
+ true
+ }
+
+ override def getRunningJobs(queues: util.Set[String]): util.List[String] = {
+ Lists.newArrayList()
+ }
+
+ override def fetchQueueStatistics(queueName: String): ResourceInfo = {
+ ResourceInfo(Int.MaxValue, 1000)
+ }
+
+ override def applicationExisted(jobId: String): Boolean = {
+ false
+ }
+
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
new file mode 100644
index 0000000000..95c35a8a75
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster
+
+import org.apache.kylin.engine.spark.utils.StorageUtils
+import io.netty.util.internal.ThrowableUtil
+import org.apache.hadoop.yarn.conf.{HAUtil, YarnConfiguration}
+import org.apache.kylin.common.util.{JsonUtil, ShellException}
+import org.apache.spark.internal.Logging
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.nio.charset.StandardCharsets
+
+object SchedulerInfoCmdHelper extends Logging {
+ private val useHttps: Boolean =
YarnConfiguration.useHttps(StorageUtils.getCurrentYarnConfiguration)
+
+ def schedulerInfo: String = {
+ val cmds = getSocketAddress.map(address => genCmd(address._1, address._2))
+ getInfoByCmds(cmds)
+ }
+
+ def metricsInfo: String = {
+ val cmds = getSocketAddress.map(address => genMetricsCmd(address._1,
address._2))
+ getInfoByCmds(cmds)
+ }
+
+ private[cluster] def getInfoByCmds(cmds: Iterable[String]): String = {
+ val results = cmds.map { cmd =>
+ try {
+ logInfo(s"Executing the command: ${cmd}")
+ execute(cmd)
+ } catch {
+ case throwable: Throwable => (1,
ThrowableUtil.stackTraceToString(throwable))
+ }
+ }
+ val tuples = results.filter(result => result._1 == 0 &&
JsonUtil.isJson(result._2))
+ if (tuples.isEmpty) {
+ val errors = tuples.map(_._2).mkString("\n")
+ logWarning(s"Error occurred when get scheduler info from cmd $cmds")
+ throw new RuntimeException(errors)
+ } else {
+ require(tuples.size == 1)
+ tuples.head._2
+ }
+ }
+
+ private[cluster] def getSocketAddress: Map[String, Int] = {
+ val conf = StorageUtils.getCurrentYarnConfiguration
+ val addresses = if (HAUtil.isHAEnabled(conf)) {
+ val haIds = HAUtil.getRMHAIds(conf).toArray
+ require(haIds.nonEmpty, "Ha ids is empty, please check your
yarn-site.xml.")
+ if (useHttps) {
+ haIds.map(id =>
conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS}.$id",
+ YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT))
+ } else {
+ haIds.map(id =>
conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_ADDRESS}.$id",
+ YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT))
+ }
+ } else {
+ if (useHttps) {
+ Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT))
+ } else {
+ Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT))
+ }
+ }
+ addresses.map(address => address.getHostName -> address.getPort).toMap
+ }
+
+ private[cluster] def genCmd(hostName: String, port: Int): String = {
+ val uri = if (useHttps) {
+ s"https://$hostName:$port/ws/v1/cluster/scheduler"
+ } else {
+ s"http://$hostName:$port/ws/v1/cluster/scheduler"
+ }
+ s"""curl -k --negotiate -u : "$uri""""
+ }
+
+ private[cluster] def genMetricsCmd(hostName: String, port: Int): String = {
+ val uri = if (useHttps) {
+ s"https://$hostName:$port/ws/v1/cluster/metrics"
+ } else {
+ s"http://$hostName:$port/ws/v1/cluster/metrics"
+ }
+ s"""curl -k --negotiate -u : "$uri""""
+ }
+
+ /**
+ * only return std out after execute command
+ *
+ * @param command
+ * @return
+ */
+ private[cluster] def execute(command: String): (Int, String) = {
+ try {
+ val cmd = new Array[String](3)
+ val osName = System.getProperty("os.name")
+ if (osName.startsWith("Windows")) {
+ cmd(0) = "cmd.exe"
+ cmd(1) = "/C"
+ }
+ else {
+ cmd(0) = "/bin/bash"
+ cmd(1) = "-c"
+ }
+ cmd(2) = command
+ val builder = new ProcessBuilder(cmd: _*)
+ builder.environment().putAll(System.getenv())
+ val proc = builder.start
+ val resultStdout = new StringBuilder
+ val inReader = new BufferedReader(new
InputStreamReader(proc.getInputStream, StandardCharsets.UTF_8.name()))
+ val newLine = System.getProperty("line.separator")
+ var line: String = inReader.readLine()
+ while (line != null) {
+ resultStdout.append(line).append(newLine)
+ line = inReader.readLine()
+ }
+
+ val stderr = new StringBuilder
+ val errorReader = new BufferedReader(new
InputStreamReader(proc.getErrorStream, StandardCharsets.UTF_8.name()))
+ line = errorReader.readLine()
+ while (line != null) {
+ stderr.append(line).append(newLine)
+ line = errorReader.readLine()
+ }
+
+ logInfo(s"The corresponding http response for the above command: \n
${stderr}")
+ try {
+ val exitCode = proc.waitFor
+ if (exitCode != 0) {
+ logError(s"executing command $command; exit code: $exitCode")
+
logError(s"==========================[stderr]===============================")
+ logError(stderr.toString)
+
logError(s"==========================[stderr]===============================")
+
+
logError(s"==========================[stdout]===============================")
+ logError(resultStdout.toString)
+
logError(s"==========================[stdout]===============================")
+ }
+ (exitCode, resultStdout.toString)
+ } catch {
+ case e: InterruptedException =>
+ Thread.currentThread.interrupt()
+ throw e
+ }
+ } catch {
+ case e: Exception => throw new ShellException(e)
+ }
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
new file mode 100644
index 0000000000..c3737a9090
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster
+
+class StandaloneClusterManager extends
org.apache.spark.deploy.master.StandaloneClusterManager {
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
new file mode 100644
index 0000000000..c3f6275a0a
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster
+
+import java.io.IOException
+import java.util
+import java.util.stream.Collectors
+import java.util.{ArrayList, EnumSet, List, Set}
+
+import com.google.common.collect.{Lists, Sets}
+import org.apache.kylin.cluster.parser.SchedulerParserFactory
+import org.apache.kylin.engine.spark.utils.StorageUtils
+import org.apache.commons.collections.CollectionUtils
+import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport,
QueueInfo, YarnApplicationState}
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.YarnException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConverters._
+
+class YarnClusterManager extends IClusterManager with Logging {
+
+ import org.apache.kylin.cluster.YarnClusterManager._
+
+ private lazy val maximumResourceAllocation: ResourceInfo = {
+ withYarnClient(yarnClient => {
+ try {
+ val response = yarnClient.createApplication().getNewApplicationResponse
+ val cap = response.getMaximumResourceCapability
+ val resourceInfo = ResourceInfo(cap.getMemory, cap.getVirtualCores)
+ logInfo(s"Cluster maximum resource allocation $resourceInfo")
+ resourceInfo
+ } catch {
+ case throwable: Throwable =>
+ logError("Error occurred when get resource upper limit per
container.", throwable)
+ throw throwable
+ }
+ })
+ }
+
+ override def fetchMaximumResourceAllocation: ResourceInfo =
maximumResourceAllocation
+
+ override def fetchQueueAvailableResource(queueName: String):
AvailableResource = {
+ val info = SchedulerInfoCmdHelper.schedulerInfo
+ val parser = SchedulerParserFactory.create(info)
+ parser.parse(info)
+ parser.availableResource(queueName)
+ }
+
+ def getYarnApplicationReport(applicationId: String): ApplicationReport = {
+ withYarnClient(yarnClient => {
+ val array = applicationId.split("_")
+ if (array.length < 3) return null
+ val appId = ApplicationId.newInstance(array(1).toLong, array(2).toInt)
+ yarnClient.getApplicationReport(appId)
+ })
+ }
+
+ override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
+ val applicationId = sparkSession.sparkContext.applicationId
+ val applicationReport = getYarnApplicationReport(applicationId)
+ if (null == applicationReport) null
+ else applicationReport.getTrackingUrl
+ }
+
+ def killApplication(jobStepId: String): Unit = {
+ killApplication("job_step_", jobStepId)
+ }
+
+ def killApplication(jobStepPrefix: String, jobStepId: String): Unit = {
+ withYarnClient(yarnClient => {
+ var orphanApplicationId: String = null
+ try {
+ val types: Set[String] = Sets.newHashSet("SPARK")
+ val states: EnumSet[YarnApplicationState] =
EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+ YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING)
+ val applicationReports: List[ApplicationReport] =
yarnClient.getApplications(types, states)
+ if (CollectionUtils.isEmpty(applicationReports)) return
+ import scala.collection.JavaConverters._
+ for (report <- applicationReports.asScala) {
+ if (report.getName.equalsIgnoreCase(jobStepPrefix + jobStepId)) {
+ orphanApplicationId = report.getApplicationId.toString
+ yarnClient.killApplication(report.getApplicationId)
+ logInfo(s"kill orphan yarn application $orphanApplicationId
succeed, job step $jobStepId")
+ }
+ }
+ } catch {
+ case ex@(_: YarnException | _: IOException) =>
+ logError(s"kill orphan yarn application $orphanApplicationId failed,
job step $jobStepId", ex)
+ }
+ })
+ }
+
+ def getRunningJobs(queues: Set[String]): List[String] = {
+ withYarnClient(yarnClient => {
+ if (queues.isEmpty) {
+ val applications =
yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING))
+ if (null == applications) new ArrayList[String]()
+ else applications.asScala.map(_.getName).asJava
+ } else {
+ val runningJobs: List[String] = new ArrayList[String]()
+ for (queue <- queues.asScala) {
+ val applications = yarnClient.getQueueInfo(queue).getApplications
+ if (null != applications) {
+ applications.asScala.filter(_.getYarnApplicationState ==
YarnApplicationState.RUNNING).map(_.getName).map(runningJobs.add)
+ }
+ }
+ runningJobs
+ }
+ })
+ }
+
+ override def fetchQueueStatistics(queueName: String): ResourceInfo = {
+ withYarnClient(yarnClient => {
+ val qs = yarnClient.getQueueInfo(queueName).getQueueStatistics
+ ResourceInfo(qs.getAvailableMemoryMB.toInt, qs.getAvailableVCores.toInt)
+ })
+ }
+
+ override def isApplicationBeenKilled(jobStepId: String): Boolean = {
+ withYarnClient(yarnClient => {
+ val types: Set[String] = Sets.newHashSet("SPARK")
+ val states: EnumSet[YarnApplicationState] =
EnumSet.of(YarnApplicationState.KILLED)
+ val applicationReports: List[ApplicationReport] =
yarnClient.getApplications(types, states)
+ if (!CollectionUtils.isEmpty(applicationReports)) {
+ import scala.collection.JavaConverters._
+ for (report <- applicationReports.asScala) {
+ if (report.getName.equalsIgnoreCase("job_step_" + jobStepId)) {
+ return true
+ }
+ }
+ }
+ false
+ })
+ }
+
+ def applicationExisted(jobId: String): Boolean = {
+ withYarnClient(yarnClient => {
+ val types: util.Set[String] = Sets.newHashSet("SPARK")
+ val states: util.EnumSet[YarnApplicationState] =
util.EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+ YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING)
+ val applicationReports: util.List[ApplicationReport] =
yarnClient.getApplications(types, states)
+ if (!CollectionUtils.isEmpty(applicationReports)) {
+ import scala.collection.JavaConverters._
+ for (report <- applicationReports.asScala) {
+ if (report.getName.equalsIgnoreCase(jobId)) {
+ return true
+ }
+ }
+ }
+ return false
+ })
+ }
+
+ def listQueueNames(): util.List[String] = {
+ withYarnClient(yarnClient => {
+ val queues: util.List[QueueInfo] = yarnClient.getAllQueues
+ val queueNames: util.List[String] = Lists.newArrayList()
+ for (queue <- queues.asScala) {
+ queueNames.add(queue.getQueueName)
+ }
+ queueNames
+ })
+ }
+
+}
+
+object YarnClusterManager {
+
+ def withYarnClient[T](body: YarnClient => T): T = {
+ val yarnClient = YarnClient.createYarnClient
+ yarnClient.init(getSpecifiedConf)
+ yarnClient.start()
+ try {
+ body(yarnClient)
+ } finally {
+ yarnClient.close()
+ }
+ }
+
+ private def getSpecifiedConf: YarnConfiguration = {
+ // yarn cluster mode couldn't access local hadoop_conf_dir
+ val yarnConf = StorageUtils.getCurrentYarnConfiguration
+ // https://issues.apache.org/jira/browse/SPARK-15343
+ yarnConf.set("yarn.timeline-service.enabled", "false")
+ yarnConf
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
new file mode 100644
index 0000000000..e22700c7e7
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster.parser
+
+import java.util.{List => JList}
+import com.fasterxml.jackson.databind.JsonNode
+import org.apache.kylin.cluster.{AvailableResource, ResourceInfo}
+import org.apache.kylin.engine.spark.application.SparkApplication
+import org.apache.kylin.engine.spark.job.KylinBuildEnv
+
+import scala.collection.JavaConverters._
+
+class CapacitySchedulerParser extends SchedulerParser {
+
+ override def availableResource(queueName: String): AvailableResource = {
+ val queues: JList[JsonNode] = root.findParents("queueName")
+ val nodes = queues.asScala.filter(queue =>
parseValue(queue.get("queueName")).equals(queueName))
+ require(nodes.size == 1)
+
+ var (queueAvailable, queueMax) = queueCapacity(nodes.head)
+ val totalResource = calTotalResource(nodes.head)
+ val clusterNode = root.findValue("schedulerInfo")
+ val cluster = clusterAvailableCapacity(clusterNode)
+ var min = Math.min(queueAvailable, cluster)
+ logInfo(s"queueAvailable is ${queueAvailable}, min is ${min}, queueMax is
${queueMax}")
+ if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && queueMax
== 0.0) {
+ logInfo("configure yarn queue using dynamic resource plan in capacity
scheduler")
+ queueMax = 1.0
+ }
+ if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && min ==
0.0) {
+ logInfo("configure yarn queue using dynamic resource plan in capacity
scheduler")
+ min = 1.0
+ }
+ var resource = AvailableResource(totalResource.percentage(min),
totalResource.percentage(queueMax))
+ try {
+ val queueAvailableRes =
KylinBuildEnv.get.clusterManager.fetchQueueStatistics(queueName)
+ resource = AvailableResource(queueAvailableRes,
totalResource.percentage(queueMax))
+ } catch {
+ case e: Error =>
+ logInfo(s"The current hadoop version does not support
QueueInfo.getQueueStatistics method.")
+ None
+ }
+
+ logInfo(s"Capacity actual available resource: $resource.")
+ resource
+ }
+
+ private def clusterAvailableCapacity(node: JsonNode): Double = {
+ val max = parseValue(node.get("capacity")).toDouble
+ val used = parseValue(node.get("usedCapacity")).toDouble
+ val capacity = (max - used) / 100
+ logInfo(s"Cluster available capacity: $capacity.")
+ capacity
+ }
+
+ private def queueCapacity(node: JsonNode): (Double, Double) = {
+ val max = parseValue(node.get("absoluteMaxCapacity")).toDouble
+ val used = parseValue(node.get("absoluteUsedCapacity")).toDouble
+ val available = (max - used) / 100
+ logInfo(s"Queue available capacity: $available.")
+ (available, max / 100)
+ }
+
+ private def calTotalResource(node: JsonNode): ResourceInfo = {
+ val usedMemory = parseValue(node.get("resourcesUsed").get("memory")).toInt
+ if (usedMemory != 0) {
+ val usedCapacity = parseValue(node.get("absoluteUsedCapacity")).toDouble
/ 100
+ val resource = ResourceInfo(Math.floor(usedMemory / usedCapacity).toInt,
Int.MaxValue)
+ logInfo(s"Estimate total cluster resource is $resource.")
+ resource
+ } else {
+ logInfo("Current queue used memory is 0, seem available resource as
infinite.")
+ ResourceInfo(Int.MaxValue, Int.MaxValue)
+ }
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
new file mode 100644
index 0000000000..f2c9125191
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster.parser
+
+import java.util.{List => JList}
+
+import com.fasterxml.jackson.databind.JsonNode
+import org.apache.kylin.cluster.{AvailableResource, ResourceInfo}
+
+import scala.collection.JavaConverters._
+
+class FairSchedulerParser extends SchedulerParser {
+
+ override def availableResource(queueName: String): AvailableResource = {
+ val cluster = clusterAvailableResource()
+ val queue = queueAvailableResource(queueName)
+ val resource =
AvailableResource(cluster.available.reduceMin(queue.available), queue.max)
+ logInfo(s"Current actual available resource: $resource.")
+ resource
+ }
+
+ private def queueAvailableResource(queueName: String): AvailableResource = {
+ val queues: JList[JsonNode] = root.findParents("queueName")
+ val nodes = queues.asScala.filter(queue =>
parseValue(queue.get("queueName")).equals(queueName))
+ require(nodes.size == 1)
+ val resource = calAvailableResource(nodes.head)
+ logInfo(s"Queue available resource: $resource.")
+ resource
+ }
+
+ private def clusterAvailableResource(): AvailableResource = {
+ val node = root.findValue("rootQueue")
+ require(node != null)
+ val resource = calAvailableResource(node)
+ logInfo(s"Cluster available resource: $resource.")
+ resource
+ }
+
+ private def calAvailableResource(node: JsonNode): AvailableResource = {
+ val maxResources = node.get("maxResources")
+ val maxMemory = parseValue(maxResources.get("memory")).toInt
+ val maxCores = parseValue(maxResources.get("vCores")).toInt
+ val usedResources = node.get("usedResources")
+ val usedMemory = parseValue(usedResources.get("memory")).toInt
+ val usedCores = parseValue(usedResources.get("vCores")).toInt
+ AvailableResource(ResourceInfo(maxMemory - usedMemory, maxCores -
usedCores), ResourceInfo(maxMemory, maxCores))
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
new file mode 100644
index 0000000000..9db043abc7
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster.parser
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.kylin.cluster.AvailableResource
+import org.apache.spark.internal.Logging
+
+trait SchedulerParser extends Logging {
+ protected var root: JsonNode = _
+ protected lazy val mapper = new ObjectMapper
+
+ def availableResource(queueName: String): AvailableResource
+
+ def parse(schedulerInfo: String): Unit = {
+ this.root = mapper.readTree(schedulerInfo)
+ }
+
+ // value in some scheduler info format to "value"
+ protected def parseValue(node: JsonNode): String = {
+ if (node.toString.startsWith("\"")) {
+ node.toString.replace("\"", "")
+ } else {
+ node.toString
+ }
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
new file mode 100644
index 0000000000..315ae3155d
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster.parser
+
+import org.apache.kylin.common.util.JsonUtil
+import org.apache.spark.internal.Logging
+
+object SchedulerParserFactory extends Logging{
+ def create(info: String): SchedulerParser = {
+ try {
+ val schedulerType =
JsonUtil.readValueAsTree(info).findValue("type").toString
+ // call contains rather than equals cause of value is surrounded with ",
for example: {"type":"fairScheduler"}
+ // do not support FifoScheduler for now
+ if (schedulerType.contains("capacityScheduler")) {
+ new CapacitySchedulerParser
+ } else if (schedulerType.contains("fairScheduler")) {
+ new FairSchedulerParser
+ } else {
+ throw new IllegalArgumentException(s"Unsupported scheduler type from
scheduler info. $schedulerType")
+ }
+ } catch {
+ case throwable: Throwable =>
+ logError(s"Invalid scheduler info. $info")
+ throw throwable
+ }
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
new file mode 100644
index 0000000000..d9d80f4298
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.scheduler
+
+import org.apache.kylin.common.KapConfig
+import org.apache.kylin.engine.spark.job.KylinBuildEnv
+import org.apache.kylin.engine.spark.utils.ThreadUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
+
+
+class ClusterMonitor extends Logging {
+ private lazy val scheduler = //
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("connect-master-guard")
+ private val JOB_STEP_PREFIX = "job_step_"
+
+ def scheduleAtFixedRate(func: () => Unit, period: Long): Unit = {
+ scheduler.scheduleAtFixedRate(new Runnable {
+ override def run(): Unit = func.apply()
+ }, period, period, TimeUnit.SECONDS)
+ }
+
+ def monitorSparkMaster(atomicBuildEnv: AtomicReference[KylinBuildEnv],
atomicSparkSession: AtomicReference[SparkSession],
+ disconnectTimes: AtomicLong,
atomicUnreachableSparkMaster: AtomicBoolean): Unit = {
+ val config = atomicBuildEnv.get().kylinConfig
+ if (KapConfig.wrap(config).isCloud && !config.isUTEnv) {
+ val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes
+ if (disconnectMaxTimes >= 0) {
+ val connectPeriod = config.getClusterManagerHealCheckIntervalSecond
+ logDebug(s"ClusterMonitor thread start with max times is
$disconnectMaxTimes period is $connectPeriod")
+ scheduleAtFixedRate(() => {
+ monitor(atomicBuildEnv, atomicSparkSession, disconnectTimes,
atomicUnreachableSparkMaster)
+ }, connectPeriod)
+ }
+ }
+ }
+
+ def monitor(atomicBuildEnv: AtomicReference[KylinBuildEnv],
atomicSparkSession: AtomicReference[SparkSession],
+ disconnectTimes: AtomicLong, atomicUnreachableSparkMaster:
AtomicBoolean): Unit = {
+ logDebug("monitor start")
+ val config = atomicBuildEnv.get().kylinConfig
+ val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes
+ try {
+ val clusterManager = atomicBuildEnv.get.clusterManager
+ clusterManager.applicationExisted(JOB_STEP_PREFIX +
atomicBuildEnv.get().buildJobInfos.getJobStepId)
+ disconnectTimes.set(0)
+ atomicUnreachableSparkMaster.set(false)
+ logDebug("monitor stop")
+ } catch {
+ case e: Exception =>
+ logError(s"monitor error with : ${e.getMessage}")
+ if (disconnectTimes.incrementAndGet() >= disconnectMaxTimes &&
atomicSparkSession.get() != null) {
+ logDebug(s"Job will stop: Unable connect spark master to reach
timeout maximum time")
+ atomicUnreachableSparkMaster.set(true)
+ atomicSparkSession.get().stop()
+ }
+ }
+ }
+
+ def shutdown(): Unit = {
+ scheduler.shutdownNow()
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
new file mode 100644
index 0000000000..ee72d16e51
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.kylin.engine.spark.utils.ThreadUtils
+
+class JobRuntime(val maxThreadCount: Int) {
+
+ private lazy val minThreads = 1
+ private lazy val maxThreads = Math.max(minThreads, maxThreadCount)
+ // Maybe we should parameterize nThreads.
+ private lazy val threadPool = //
+ ThreadUtils.newDaemonScalableThreadPool("build-thread", //
+ minThreads, maxThreads, 20, TimeUnit.SECONDS)
+
+ // Drain layout result using single thread.
+ private lazy val scheduler = //
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("build-scheduler")
+
+ def submit(fun: () => Unit): Unit = {
+ threadPool.submit(new Runnable {
+ override def run(): Unit = fun.apply()
+ })
+ }
+
+ def scheduleCheckpoint(fun: () => Unit): Unit = {
+ scheduler.scheduleWithFixedDelay(() => fun.apply(), 30L, 30L,
TimeUnit.SECONDS)
+ }
+
+ def schedule(func: () => Unit, delay: Long, unit: TimeUnit): Unit = {
+ scheduler.schedule(new Runnable {
+ override def run(): Unit = func.apply()
+ }, delay, unit)
+ }
+
+ def shutdown(): Unit = {
+ scheduler.shutdownNow()
+ threadPool.shutdownNow()
+ }
+
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
new file mode 100644
index 0000000000..d0de86c938
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.scheduler
+
+sealed trait KylinJobEvent
+
+sealed trait JobStatus extends KylinJobEvent
+
+case class JobSucceeded() extends JobStatus
+
+case class JobFailed(reason: String, throwable: Throwable) extends JobStatus
+
+sealed trait JobEndReason extends KylinJobEvent
+
+sealed trait JobFailedReason extends JobEndReason
+
+case class ResourceLack(throwable: Throwable) extends JobFailedReason
+
+case class ExceedMaxRetry(throwable: Throwable) extends JobFailedReason
+
+case class UnknownThrowable(throwable: Throwable) extends JobFailedReason
+
+
+sealed trait JobCommand extends KylinJobEvent
+
+case class RunJob() extends JobCommand
+
+
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
new file mode 100644
index 0000000000..686a183c32
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.scheduler
+
+trait KylinJobListener {
+ def onReceive(event: KylinJobEvent): Unit
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
new file mode 100644
index 0000000000..98a5d7e5da
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.application
+
+import org.apache.kylin.engine.spark.job.KylinBuildEnv
+import org.apache.kylin.engine.spark.scheduler._
+import io.netty.util.internal.ThrowableUtil
+import org.apache.kylin.common.util.Unsafe
+import org.apache.spark.autoheal.ExceptionTerminator
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.KylinJobEventLoop
+
+class JobMonitor(eventLoop: KylinJobEventLoop) extends Logging {
+ var retryTimes = 0
+ eventLoop.registerListener(new KylinJobListener {
+ override def onReceive(event: KylinJobEvent): Unit = {
+ event match {
+ case rl: ResourceLack => handleResourceLack(rl)
+ case ut: UnknownThrowable => handleUnknownThrowable(ut)
+ case emr: ExceedMaxRetry => handleExceedMaxRetry(emr)
+ case _ =>
+ }
+ }
+ })
+
+ def stop(): Unit = {
+ }
+
+ def handleResourceLack(rl: ResourceLack): Unit = {
+ try {
+ if (rl.throwable != null && rl.throwable.getCause != null) {
+ if (rl.throwable.getCause.isInstanceOf[NoRetryException]) {
+ eventLoop.post(JobFailed(rl.throwable.getCause.getMessage,
rl.throwable.getCause))
+ return
+ }
+ }
+
+ logInfo(s"handleResourceLack --> ${rl.throwable.getCause}")
+ val buildEnv = KylinBuildEnv.get()
+ // if killed, job failed without retry
+ val jobStepId = buildEnv.buildJobInfos.getJobStepId
+ if (buildEnv.clusterManager.isApplicationBeenKilled(jobStepId)) {
+ eventLoop.post(JobFailed(s"Submitted application $jobStepId has been
killed.", rl.throwable))
+ return
+ }
+ retryTimes += 1
+ KylinBuildEnv.get().buildJobInfos.recordRetryTimes(retryTimes)
+ val maxRetry = buildEnv.kylinConfig.getSparkEngineMaxRetryTime
+ if (retryTimes <= maxRetry) {
+ logError(s"Job failed $retryTimes time(s). Cause:", rl.throwable)
+ Unsafe.setProperty("kylin.spark-conf.auto-prior", "false")
+ ExceptionTerminator.resolveException(rl, eventLoop)
+ } else {
+ eventLoop.post(ExceedMaxRetry(rl.throwable))
+ }
+ } catch {
+ case throwable: Throwable => eventLoop.post(JobFailed("Error occurred
when generate retry configuration.", throwable))
+ }
+ }
+
+
+ def handleExceedMaxRetry(emr: ExceedMaxRetry): Unit = {
+ eventLoop.post(JobFailed("Retry times exceed MaxRetry set in the
KylinConfig.", emr.throwable))
+ }
+
+ def handleUnknownThrowable(ur: UnknownThrowable): Unit = {
+ eventLoop.post(JobFailed("Unknown error occurred during the job.",
ur.throwable))
+ }
+}
+
+case class RetryInfo(overrideConf: java.util.Map[String, String], throwable:
Throwable) {
+ override def toString: String = {
+ s"""RetryInfo{
+ | overrideConf : $overrideConf,
+ | throwable : ${ThrowableUtil.stackTraceToString(throwable)}
+ |}""".stripMargin
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
new file mode 100644
index 0000000000..1837b92fa0
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.application
+
+import io.kyligence.kap.engine.spark.job.ParamsConstants
+import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.kylin.common.util.{JsonUtil, Unsafe}
+import org.apache.kylin.engine.spark.application.SparkApplication
+import org.apache.kylin.engine.spark.job.KylinBuildEnv
+import org.apache.kylin.engine.spark.scheduler._
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.KylinJobEventLoop
+
+import java.util
+import java.util.concurrent.CountDownLatch
+
+/**
+ * Spark driver part, construct the real spark job [SparkApplication]
+ */
+object JobWorkSpace extends Logging {
+ def execute(args: Array[String]): Unit = {
+ try {
+ val (application, appArgs) = resolveArgs(args)
+ val eventLoop = new KylinJobEventLoop
+ val worker = new JobWorker(application, appArgs, eventLoop)
+ val monitor = new JobMonitor(eventLoop)
+ val workspace = new JobWorkSpace(eventLoop, monitor, worker)
+ val statusCode = workspace.run()
+ if (statusCode != 0) {
+ Unsafe.systemExit(statusCode)
+ }
+ } catch {
+ case throwable: Throwable =>
+ logError("Error occurred when init job workspace.", throwable)
+ Unsafe.systemExit(1)
+ }
+ }
+
+ def resolveArgs(args: Array[String]): (SparkApplication, Array[String]) = {
+ if (args.length < 2 || args(0) != "-className") throw new
IllegalArgumentException("className is required")
+ val className = args(1)
+ // scalastyle:off
+ val o = Class.forName(className).newInstance
+ // scalastyle:on
+ if (!o.isInstanceOf[SparkApplication]) throw new
IllegalArgumentException(className + " is not a subClass of
AbstractApplication")
+ val appArgs = args.slice(2, args.length)
+ val application = o.asInstanceOf[SparkApplication]
+ (application, appArgs)
+ }
+}
+
+class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker:
JobWorker) extends Logging {
+ require(eventLoop != null)
+ require(monitor != null)
+ require(worker != null)
+
+ private var statusCode: Int = 0
+ private val latch = new CountDownLatch(1)
+
+ eventLoop.registerListener(new KylinJobListener {
+ override def onReceive(event: KylinJobEvent): Unit = {
+ event match {
+ case _: JobSucceeded => success()
+ case jf: JobFailed => fail(jf)
+ case _ =>
+ }
+ }
+ })
+
+ def run(): Int = {
+ eventLoop.start()
+ eventLoop.post(RunJob())
+ latch.await()
+ statusCode
+ }
+
+ def success(): Unit = {
+ try {
+ stop()
+ } finally {
+ statusCode = 0
+ latch.countDown()
+ }
+ }
+
+ def fail(jf: JobFailed): Unit = {
+ try {
+ logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable)
+ KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new
util.HashMap, jf.throwable))
+ updateJobErrorInfo(jf)
+ stop()
+ } finally {
+ statusCode = 1
+ latch.countDown()
+ }
+ }
+
+ def stop(): Unit = {
+ monitor.stop()
+ worker.stop()
+ eventLoop.stop()
+ }
+
+ def updateJobErrorInfo(jf: JobFailed): Unit = {
+ val infos = KylinBuildEnv.get().buildJobInfos
+ val context = worker.getApplication
+
+ val project = context.getProject
+ val jobId = context.getJobId
+
+ val stageId = infos.getStageId
+ val jobStepId = StringUtils.replace(infos.getJobStepId,
SparkApplication.JOB_NAME_PREFIX, "")
+ val failedStepId = if (StringUtils.isBlank(stageId)) jobStepId else stageId
+
+ val failedSegmentId = infos.getSegmentId
+ val failedStack = ExceptionUtils.getStackTrace(jf.throwable)
+ val failedReason =
+ if (context.getAtomicUnreachableSparkMaster.get()) "Unable connect spark
master to reach timeout maximum time"
+ else jf.reason
+ val url = "/kylin/api/jobs/error"
+
+ val payload: util.HashMap[String, Object] = new util.HashMap[String,
Object](5)
+ payload.put("project", project)
+ payload.put("job_id", jobId)
+ payload.put("failed_step_id", failedStepId)
+ payload.put("failed_segment_id", failedSegmentId)
+ payload.put("failed_stack", failedStack)
+ payload.put("failed_reason", failedReason)
+ val json = JsonUtil.writeValueAsString(payload)
+ val params = new util.HashMap[String, String]()
+ val config = KylinBuildEnv.get().kylinConfig
+ params.put(ParamsConstants.TIME_OUT,
config.getUpdateJobInfoTimeout.toString)
+ params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true))
+ context.getReport.updateSparkJobInfo(params, url, json);
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
new file mode 100644
index 0000000000..069ac171cc
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.application
+
+import java.util.concurrent.Executors
+
+import org.apache.kylin.engine.spark.application.SparkApplication
+import org.apache.kylin.engine.spark.scheduler._
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.KylinJobEventLoop
+
+class JobWorker(application: SparkApplication, args: Array[String], eventLoop:
KylinJobEventLoop) extends Logging {
+ private val pool = Executors.newSingleThreadExecutor()
+
+ def getApplication: SparkApplication = application
+
+ eventLoop.registerListener(new KylinJobListener {
+ override def onReceive(event: KylinJobEvent): Unit = {
+ event match {
+ case _: RunJob => runJob()
+ case _ =>
+ }
+ }
+ })
+
+ def stop(): Unit = {
+ pool.shutdownNow()
+ application.logJobInfo()
+ }
+
+ private def runJob(): Unit = {
+ execute()
+ }
+
+
+ private def execute(): Unit = {
+ pool.execute(new Runnable {
+ override def run(): Unit = {
+ try {
+ application.execute(args)
+ eventLoop.post(JobSucceeded())
+ } catch {
+ case exception: NoRetryException =>
eventLoop.post(UnknownThrowable(exception))
+ case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
+ }
+ }
+ })
+ }
+}
+
+class NoRetryException(msg: String) extends java.lang.Exception(msg) {
+ def this() {
+ this(null)
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
new file mode 100644
index 0000000000..5cefd24054
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.util
+
+import org.apache.kylin.cluster.{AvailableResource, IClusterManager,
ResourceInfo}
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.deploy.DeployMessages.{KillApplication,
MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.master.StandaloneClusterManager.masterEndpoints
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Utils
+import org.apache.spark.{SecurityManager, SparkConf}
+
+// scalastyle:off
+class StandaloneClusterManager extends IClusterManager with Logging {
+
+ private val JOB_STEP_PREFIX = "job_step_"
+
+ override def fetchMaximumResourceAllocation: ResourceInfo = {
+ val state =
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
+ val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE)
+ val availableMem = aliveWorkers.map(_.memoryFree).sum
+ val availableCores = aliveWorkers.map(_.coresFree).sum
+ logInfo(s"Get available resource, " +
+ s"availableMem: $availableMem, availableCores: $availableCores")
+ ResourceInfo(availableMem, availableCores)
+ }
+
+ override def fetchQueueAvailableResource(queueName: String):
AvailableResource = {
+ val state =
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
+ val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE)
+ val availableMem = aliveWorkers.map(_.memoryFree).sum
+ val availableCores = aliveWorkers.map(_.coresFree).sum
+ val totalMem = aliveWorkers.map(_.memory).sum
+ val totalCores = aliveWorkers.map(_.cores).sum
+ logInfo(s"Get available resource, " +
+ s"availableMem: $availableMem, availableCores: $availableCores, " +
+ s"totalMem: $totalMem, totalCores: $totalCores")
+ AvailableResource(ResourceInfo(availableMem, availableCores),
ResourceInfo(totalMem, totalCores))
+ }
+
+ override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
+ val applicationId = sparkSession.sparkContext.applicationId
+ logInfo(s"Get tracking url of application $applicationId")
+ val state =
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
+ val app = state.activeApps.find(_.id == applicationId).orNull
+ if (app == null) {
+ logInfo(s"No active application found of applicationId $applicationId")
+ return null
+ }
+ app.desc.appUiUrl
+ }
+
+ override def killApplication(jobStepId: String): Unit = {
+ killApplication(s"$JOB_STEP_PREFIX", jobStepId)
+ }
+
+ override def killApplication(jobStepPrefix: String, jobStepId: String): Unit
= {
+ val master = masterEndpoints(0)
+ val state = master.askSync[MasterStateResponse](RequestMasterState)
+ val app = state.activeApps.find(_.desc.name.equals(jobStepPrefix +
jobStepId)).orNull
+ if (app == null) {
+ logInfo(s"No active application found of jobStepId $jobStepId")
+ return
+ }
+ logInfo(s"Kill application ${app.id} by jobStepId $jobStepId")
+ master.send(KillApplication(app.id))
+ }
+
+ override def getRunningJobs(queues: util.Set[String]): util.List[String] = {
+ val state =
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
+ val jobStepNames = state.activeApps.map(_.desc.name)
+ logInfo(s"Get running jobs ${jobStepNames.toSeq}")
+ import scala.collection.JavaConverters._
+ jobStepNames.toList.asJava
+ }
+
+ override def fetchQueueStatistics(queueName: String): ResourceInfo = {
+ fetchMaximumResourceAllocation
+ }
+
+ override def isApplicationBeenKilled(jobStepId: String): Boolean = {
+ val master = masterEndpoints(0)
+ val state = master.askSync[MasterStateResponse](RequestMasterState)
+ val app =
state.completedApps.find(_.desc.name.equals(s"$JOB_STEP_PREFIX$jobStepId")).orNull
+ if (app == null) {
+ false
+ } else {
+ "KILLED".equals(app.state.toString)
+ }
+ }
+
+ override def applicationExisted(jobId: String): Boolean = {
+ val master = masterEndpoints(0)
+ val state = master.askSync[MasterStateResponse](RequestMasterState)
+ val app = state.activeApps.find(_.desc.name.equalsIgnoreCase(jobId)).orNull
+ if (app == null) {
+ false
+ } else {
+ true
+ }
+ }
+
+}
+
+object StandaloneClusterManager extends Logging {
+
+ private val ENDPOINT_NAME = "Master"
+ private val CLIENT_NAME = "kylinStandaloneClient"
+ private val SPARK_LOCAL = "local"
+ private val SPARK_MASTER = "spark.master"
+ private val SPARK_RPC_TIMEOUT = "spark.rpc.askTimeout"
+ private val SPARK_AUTHENTICATE = "spark.authenticate"
+ private val SPARK_AUTHENTICATE_SECRET = "spark.authenticate.secret"
+ private val SPARK_NETWORK_CRYPTO_ENABLED = "spark.network.crypto.enabled"
+
+ private lazy val masterEndpoints = {
+ val overrideConfig = KylinConfig.getInstanceFromEnv.getSparkConfigOverride
+ val conf = new SparkConf()
+ if (!conf.contains(SPARK_MASTER) ||
conf.get(SPARK_MASTER).startsWith(SPARK_LOCAL)) {
+ conf.set(SPARK_MASTER, overrideConfig.get(SPARK_MASTER))
+ }
+ if (!conf.contains(SPARK_RPC_TIMEOUT)) {
+ conf.set(SPARK_RPC_TIMEOUT, "10s")
+ }
+ if (overrideConfig.containsKey(SPARK_AUTHENTICATE) &&
"true".equals(overrideConfig.get(SPARK_AUTHENTICATE))) {
+ conf.set(SPARK_AUTHENTICATE, "true")
+ conf.set(SPARK_AUTHENTICATE_SECRET,
overrideConfig.getOrDefault(SPARK_AUTHENTICATE_SECRET, "kylin"))
+ conf.set(SPARK_NETWORK_CRYPTO_ENABLED,
overrideConfig.getOrDefault(SPARK_NETWORK_CRYPTO_ENABLED, "true"))
+ }
+ logInfo(s"Spark master ${conf.get(SPARK_MASTER)}")
+ val rpcEnv = RpcEnv.create(CLIENT_NAME, Utils.localHostName(), 0, conf,
new SecurityManager(conf))
+ val masterUrls = conf.get(SPARK_MASTER)
+ Utils.parseStandaloneMasterUrls(masterUrls)
+ .map(RpcAddress.fromSparkURL)
+ .map(rpcEnv.setupEndpointRef(_, ENDPOINT_NAME))
+ }
+}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
new file mode 100644
index 0000000000..38f1554a01
--- /dev/null
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.kylin.engine.spark.scheduler.{KylinJobEvent,
KylinJobListener}
+import org.apache.spark.util.EventLoop
+
+import scala.collection.JavaConverters._
+
+class KylinJobEventLoop extends
EventLoop[KylinJobEvent]("spark-entry-event-loop") {
+ // register listener in single thread, so LinkedList is enough
+ private var listeners: JList[KylinJobListener] = new
util.LinkedList[KylinJobListener]()
+
+ def registerListener(listener: KylinJobListener): Boolean = {
+ listeners.add(listener)
+ }
+
+ def unregisterListener(listener: KylinJobListener): Boolean = {
+ listeners.remove(listener)
+ }
+
+ override protected def onReceive(event: KylinJobEvent): Unit = {
+ listeners.asScala.foreach(_.onReceive(event))
+ }
+
+ override protected def onError(e: Throwable): Unit = {
+
+ }
+}