This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new d8974e8525 KYLIN-5217 Create a initial commit
d8974e8525 is described below

commit d8974e85252f7dd6732293beb6387d277df56057
Author: longfei.jiang <longfei.ji...@kyligence.io>
AuthorDate: Tue Aug 9 14:22:39 2022 +0800

    KYLIN-5217 Create a initial commit
---
 .../kylin/cluster/ClusterManagerFactory.scala      |  37 ++++
 .../apache/kylin/cluster/K8sClusterManager.scala   |  65 +++++++
 .../kylin/cluster/SchedulerInfoCmdHelper.scala     | 167 +++++++++++++++++
 .../kylin/cluster/StandaloneClusterManager.scala   |  22 +++
 .../apache/kylin/cluster/YarnClusterManager.scala  | 206 +++++++++++++++++++++
 .../cluster/parser/CapacitySchedulerParser.scala   |  92 +++++++++
 .../kylin/cluster/parser/FairSchedulerParser.scala |  64 +++++++
 .../kylin/cluster/parser/SchedulerParser.scala     |  43 +++++
 .../cluster/parser/SchedulerParserFactory.scala    |  43 +++++
 .../engine/spark/scheduler/ClusterMonitor.scala    |  82 ++++++++
 .../kylin/engine/spark/scheduler/JobRuntime.scala  |  59 ++++++
 .../engine/spark/scheduler/KylinJobEvent.scala     |  44 +++++
 .../engine/spark/scheduler/KylinJobListener.scala  |  23 +++
 .../org/apache/spark/application/JobMonitor.scala  |  94 ++++++++++
 .../apache/spark/application/JobWorkSpace.scala    | 153 +++++++++++++++
 .../org/apache/spark/application/JobWorker.scala   |  71 +++++++
 .../deploy/master/StandaloneClusterManager.scala   | 157 ++++++++++++++++
 .../apache/spark/scheduler/KylinJobEventLoop.scala |  48 +++++
 18 files changed, 1470 insertions(+)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
new file mode 100644
index 0000000000..6009232f57
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster
+
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import io.kyligence.kap.guava20.shaded.common.util.concurrent.SimpleTimeLimiter
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.util.KylinReflectUtils
+
+
+object ClusterManagerFactory {
+
+  val pool: ExecutorService = Executors.newCachedThreadPool
+
+  def create(kylinConfig: KylinConfig): IClusterManager = {
+    val timeLimiter = SimpleTimeLimiter.create(pool)
+    val target = 
KylinReflectUtils.createObject(kylinConfig.getClusterManagerClassName)._1.asInstanceOf[IClusterManager]
+
+    timeLimiter.newProxy(target, classOf[IClusterManager], 
kylinConfig.getClusterManagerTimeoutThreshold, TimeUnit.SECONDS);
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
new file mode 100644
index 0000000000..8711622885
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.cluster
+
+import java.util
+
+import com.google.common.collect.Lists
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+class K8sClusterManager extends IClusterManager with Logging {
+  override def fetchMaximumResourceAllocation: ResourceInfo = {
+    ResourceInfo(Int.MaxValue, 1000)
+  }
+
+  override def fetchQueueAvailableResource(queueName: String): 
AvailableResource = {
+    AvailableResource(ResourceInfo(Int.MaxValue, 1000), 
ResourceInfo(Int.MaxValue, 1000))
+  }
+
+  override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
+    logInfo("Get Build Tracking Url!")
+    val applicationId = sparkSession.sparkContext.applicationId
+    ""
+  }
+
+  override def killApplication(jobStepId: String): Unit = {
+    logInfo("Kill Application $jobStepId !")
+  }
+
+  override def killApplication(jobStepPrefix: String, jobStepId: String): Unit 
= {
+    logInfo("Kill Application $jobStepPrefix $jobStepId !")
+  }
+
+  override def isApplicationBeenKilled(applicationId: String): Boolean = {
+    true
+  }
+
+  override def getRunningJobs(queues: util.Set[String]): util.List[String] = {
+    Lists.newArrayList()
+  }
+
+  override def fetchQueueStatistics(queueName: String): ResourceInfo = {
+    ResourceInfo(Int.MaxValue, 1000)
+  }
+
+  override def applicationExisted(jobId: String): Boolean = {
+    false
+  }
+
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
new file mode 100644
index 0000000000..95c35a8a75
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster
+
+import org.apache.kylin.engine.spark.utils.StorageUtils
+import io.netty.util.internal.ThrowableUtil
+import org.apache.hadoop.yarn.conf.{HAUtil, YarnConfiguration}
+import org.apache.kylin.common.util.{JsonUtil, ShellException}
+import org.apache.spark.internal.Logging
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.nio.charset.StandardCharsets
+
+object SchedulerInfoCmdHelper extends Logging {
+  private val useHttps: Boolean = 
YarnConfiguration.useHttps(StorageUtils.getCurrentYarnConfiguration)
+
+  def schedulerInfo: String = {
+    val cmds = getSocketAddress.map(address => genCmd(address._1, address._2))
+    getInfoByCmds(cmds)
+  }
+
+  def metricsInfo: String = {
+    val cmds = getSocketAddress.map(address => genMetricsCmd(address._1, 
address._2))
+    getInfoByCmds(cmds)
+  }
+
+  private[cluster] def getInfoByCmds(cmds: Iterable[String]): String = {
+    val results = cmds.map { cmd =>
+      try {
+        logInfo(s"Executing the command: ${cmd}")
+        execute(cmd)
+      } catch {
+        case throwable: Throwable => (1, 
ThrowableUtil.stackTraceToString(throwable))
+      }
+    }
+    val tuples = results.filter(result => result._1 == 0 && 
JsonUtil.isJson(result._2))
+    if (tuples.isEmpty) {
+      val errors = tuples.map(_._2).mkString("\n")
+      logWarning(s"Error occurred when get scheduler info from cmd $cmds")
+      throw new RuntimeException(errors)
+    } else {
+      require(tuples.size == 1)
+      tuples.head._2
+    }
+  }
+
+  private[cluster] def getSocketAddress: Map[String, Int] = {
+    val conf = StorageUtils.getCurrentYarnConfiguration
+    val addresses = if (HAUtil.isHAEnabled(conf)) {
+      val haIds = HAUtil.getRMHAIds(conf).toArray
+      require(haIds.nonEmpty, "Ha ids is empty, please check your 
yarn-site.xml.")
+      if (useHttps) {
+        haIds.map(id => 
conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS}.$id",
+          YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, 
YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT))
+      } else {
+        haIds.map(id => 
conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_ADDRESS}.$id",
+          YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, 
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT))
+      }
+    } else {
+      if (useHttps) {
+        Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, 
YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT))
+      } else {
+        Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, 
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT))
+      }
+    }
+    addresses.map(address => address.getHostName -> address.getPort).toMap
+  }
+
+  private[cluster] def genCmd(hostName: String, port: Int): String = {
+    val uri = if (useHttps) {
+      s"https://$hostName:$port/ws/v1/cluster/scheduler";
+    } else {
+      s"http://$hostName:$port/ws/v1/cluster/scheduler";
+    }
+    s"""curl -k --negotiate -u : "$uri""""
+  }
+
+  private[cluster] def genMetricsCmd(hostName: String, port: Int): String = {
+    val uri = if (useHttps) {
+      s"https://$hostName:$port/ws/v1/cluster/metrics";
+    } else {
+      s"http://$hostName:$port/ws/v1/cluster/metrics";
+    }
+    s"""curl -k --negotiate -u : "$uri""""
+  }
+
+  /**
+   * only return std out after execute command
+   *
+   * @param command
+   * @return
+   */
+  private[cluster] def execute(command: String): (Int, String) = {
+    try {
+      val cmd = new Array[String](3)
+      val osName = System.getProperty("os.name")
+      if (osName.startsWith("Windows")) {
+        cmd(0) = "cmd.exe"
+        cmd(1) = "/C"
+      }
+      else {
+        cmd(0) = "/bin/bash"
+        cmd(1) = "-c"
+      }
+      cmd(2) = command
+      val builder = new ProcessBuilder(cmd: _*)
+      builder.environment().putAll(System.getenv())
+      val proc = builder.start
+      val resultStdout = new StringBuilder
+      val inReader = new BufferedReader(new 
InputStreamReader(proc.getInputStream, StandardCharsets.UTF_8.name()))
+      val newLine = System.getProperty("line.separator")
+      var line: String = inReader.readLine()
+      while (line != null) {
+        resultStdout.append(line).append(newLine)
+        line = inReader.readLine()
+      }
+
+      val stderr = new StringBuilder
+      val errorReader = new BufferedReader(new 
InputStreamReader(proc.getErrorStream, StandardCharsets.UTF_8.name()))
+      line = errorReader.readLine()
+      while (line != null) {
+        stderr.append(line).append(newLine)
+        line = errorReader.readLine()
+      }
+
+      logInfo(s"The corresponding http response for the above command: \n 
${stderr}")
+      try {
+        val exitCode = proc.waitFor
+        if (exitCode != 0) {
+          logError(s"executing command $command; exit code: $exitCode")
+          
logError(s"==========================[stderr]===============================")
+          logError(stderr.toString)
+          
logError(s"==========================[stderr]===============================")
+
+          
logError(s"==========================[stdout]===============================")
+          logError(resultStdout.toString)
+          
logError(s"==========================[stdout]===============================")
+        }
+        (exitCode, resultStdout.toString)
+      } catch {
+        case e: InterruptedException =>
+          Thread.currentThread.interrupt()
+          throw e
+      }
+    } catch {
+      case e: Exception => throw new ShellException(e)
+    }
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
new file mode 100644
index 0000000000..c3737a9090
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster
+
+class StandaloneClusterManager extends 
org.apache.spark.deploy.master.StandaloneClusterManager {
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
new file mode 100644
index 0000000000..c3f6275a0a
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster
+
+import java.io.IOException
+import java.util
+import java.util.stream.Collectors
+import java.util.{ArrayList, EnumSet, List, Set}
+
+import com.google.common.collect.{Lists, Sets}
+import org.apache.kylin.cluster.parser.SchedulerParserFactory
+import org.apache.kylin.engine.spark.utils.StorageUtils
+import org.apache.commons.collections.CollectionUtils
+import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, 
QueueInfo, YarnApplicationState}
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.YarnException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConverters._
+
+class YarnClusterManager extends IClusterManager with Logging {
+
+  import org.apache.kylin.cluster.YarnClusterManager._
+
+  private lazy val maximumResourceAllocation: ResourceInfo = {
+    withYarnClient(yarnClient => {
+      try {
+        val response = yarnClient.createApplication().getNewApplicationResponse
+        val cap = response.getMaximumResourceCapability
+        val resourceInfo = ResourceInfo(cap.getMemory, cap.getVirtualCores)
+        logInfo(s"Cluster maximum resource allocation $resourceInfo")
+        resourceInfo
+      } catch {
+        case throwable: Throwable =>
+          logError("Error occurred when get resource upper limit per 
container.", throwable)
+          throw throwable
+      }
+    })
+  }
+
+  override def fetchMaximumResourceAllocation: ResourceInfo = 
maximumResourceAllocation
+
+  override def fetchQueueAvailableResource(queueName: String): 
AvailableResource = {
+    val info = SchedulerInfoCmdHelper.schedulerInfo
+    val parser = SchedulerParserFactory.create(info)
+    parser.parse(info)
+    parser.availableResource(queueName)
+  }
+
+  def getYarnApplicationReport(applicationId: String): ApplicationReport = {
+    withYarnClient(yarnClient => {
+      val array = applicationId.split("_")
+      if (array.length < 3) return null
+      val appId = ApplicationId.newInstance(array(1).toLong, array(2).toInt)
+      yarnClient.getApplicationReport(appId)
+    })
+  }
+
+  override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
+    val applicationId = sparkSession.sparkContext.applicationId
+    val applicationReport = getYarnApplicationReport(applicationId)
+    if (null == applicationReport) null
+    else applicationReport.getTrackingUrl
+  }
+
+  def killApplication(jobStepId: String): Unit = {
+    killApplication("job_step_", jobStepId)
+  }
+
+  def killApplication(jobStepPrefix: String, jobStepId: String): Unit = {
+    withYarnClient(yarnClient => {
+      var orphanApplicationId: String = null
+      try {
+        val types: Set[String] = Sets.newHashSet("SPARK")
+        val states: EnumSet[YarnApplicationState] = 
EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+          YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, 
YarnApplicationState.RUNNING)
+        val applicationReports: List[ApplicationReport] = 
yarnClient.getApplications(types, states)
+        if (CollectionUtils.isEmpty(applicationReports)) return
+        import scala.collection.JavaConverters._
+        for (report <- applicationReports.asScala) {
+          if (report.getName.equalsIgnoreCase(jobStepPrefix + jobStepId)) {
+            orphanApplicationId = report.getApplicationId.toString
+            yarnClient.killApplication(report.getApplicationId)
+            logInfo(s"kill orphan yarn application $orphanApplicationId 
succeed, job step $jobStepId")
+          }
+        }
+      } catch {
+        case ex@(_: YarnException | _: IOException) =>
+          logError(s"kill orphan yarn application $orphanApplicationId failed, 
job step $jobStepId", ex)
+      }
+    })
+  }
+
+  def getRunningJobs(queues: Set[String]): List[String] = {
+    withYarnClient(yarnClient => {
+      if (queues.isEmpty) {
+        val applications = 
yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING))
+        if (null == applications) new ArrayList[String]()
+        else applications.asScala.map(_.getName).asJava
+      } else {
+        val runningJobs: List[String] = new ArrayList[String]()
+        for (queue <- queues.asScala) {
+          val applications = yarnClient.getQueueInfo(queue).getApplications
+          if (null != applications) {
+            applications.asScala.filter(_.getYarnApplicationState == 
YarnApplicationState.RUNNING).map(_.getName).map(runningJobs.add)
+          }
+        }
+        runningJobs
+      }
+    })
+  }
+
+  override def fetchQueueStatistics(queueName: String): ResourceInfo = {
+    withYarnClient(yarnClient => {
+      val qs = yarnClient.getQueueInfo(queueName).getQueueStatistics
+      ResourceInfo(qs.getAvailableMemoryMB.toInt, qs.getAvailableVCores.toInt)
+    })
+  }
+
+  override def isApplicationBeenKilled(jobStepId: String): Boolean = {
+    withYarnClient(yarnClient => {
+      val types: Set[String] = Sets.newHashSet("SPARK")
+      val states: EnumSet[YarnApplicationState] = 
EnumSet.of(YarnApplicationState.KILLED)
+      val applicationReports: List[ApplicationReport] = 
yarnClient.getApplications(types, states)
+      if (!CollectionUtils.isEmpty(applicationReports)) {
+        import scala.collection.JavaConverters._
+        for (report <- applicationReports.asScala) {
+          if (report.getName.equalsIgnoreCase("job_step_" + jobStepId)) {
+            return true
+          }
+        }
+      }
+      false
+    })
+  }
+
+  def applicationExisted(jobId: String): Boolean = {
+    withYarnClient(yarnClient => {
+      val types: util.Set[String] = Sets.newHashSet("SPARK")
+      val states: util.EnumSet[YarnApplicationState] = 
util.EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+        YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, 
YarnApplicationState.RUNNING)
+      val applicationReports: util.List[ApplicationReport] = 
yarnClient.getApplications(types, states)
+      if (!CollectionUtils.isEmpty(applicationReports)) {
+        import scala.collection.JavaConverters._
+        for (report <- applicationReports.asScala) {
+          if (report.getName.equalsIgnoreCase(jobId)) {
+            return true
+          }
+        }
+      }
+      return false
+    })
+  }
+
+  def listQueueNames(): util.List[String] = {
+    withYarnClient(yarnClient => {
+      val queues: util.List[QueueInfo] = yarnClient.getAllQueues
+      val queueNames: util.List[String] = Lists.newArrayList()
+      for (queue <- queues.asScala) {
+        queueNames.add(queue.getQueueName)
+      }
+      queueNames
+    })
+  }
+
+}
+
+object YarnClusterManager {
+
+  def withYarnClient[T](body: YarnClient => T): T = {
+    val yarnClient = YarnClient.createYarnClient
+    yarnClient.init(getSpecifiedConf)
+    yarnClient.start()
+    try {
+      body(yarnClient)
+    } finally {
+      yarnClient.close()
+    }
+  }
+
+  private def getSpecifiedConf: YarnConfiguration = {
+    // yarn cluster mode couldn't access local hadoop_conf_dir
+    val yarnConf = StorageUtils.getCurrentYarnConfiguration
+    // https://issues.apache.org/jira/browse/SPARK-15343
+    yarnConf.set("yarn.timeline-service.enabled", "false")
+    yarnConf
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
new file mode 100644
index 0000000000..e22700c7e7
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster.parser
+
+import java.util.{List => JList}
+import com.fasterxml.jackson.databind.JsonNode
+import org.apache.kylin.cluster.{AvailableResource, ResourceInfo}
+import org.apache.kylin.engine.spark.application.SparkApplication
+import org.apache.kylin.engine.spark.job.KylinBuildEnv
+
+import scala.collection.JavaConverters._
+
+class CapacitySchedulerParser extends SchedulerParser {
+
+  override def availableResource(queueName: String): AvailableResource = {
+    val queues: JList[JsonNode] = root.findParents("queueName")
+    val nodes = queues.asScala.filter(queue => 
parseValue(queue.get("queueName")).equals(queueName))
+    require(nodes.size == 1)
+
+    var (queueAvailable, queueMax) = queueCapacity(nodes.head)
+    val totalResource = calTotalResource(nodes.head)
+    val clusterNode = root.findValue("schedulerInfo")
+    val cluster = clusterAvailableCapacity(clusterNode)
+    var min = Math.min(queueAvailable, cluster)
+    logInfo(s"queueAvailable is ${queueAvailable}, min is ${min}, queueMax is 
${queueMax}")
+    if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && queueMax 
== 0.0) {
+      logInfo("configure yarn queue using dynamic resource plan in capacity 
scheduler")
+      queueMax = 1.0
+    }
+    if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && min == 
0.0) {
+      logInfo("configure yarn queue using dynamic resource plan in capacity 
scheduler")
+      min = 1.0
+    }
+    var resource = AvailableResource(totalResource.percentage(min), 
totalResource.percentage(queueMax))
+    try {
+      val queueAvailableRes = 
KylinBuildEnv.get.clusterManager.fetchQueueStatistics(queueName)
+      resource = AvailableResource(queueAvailableRes, 
totalResource.percentage(queueMax))
+    } catch {
+      case e: Error =>
+        logInfo(s"The current hadoop version does not support 
QueueInfo.getQueueStatistics method.")
+        None
+    }
+
+    logInfo(s"Capacity actual available resource: $resource.")
+    resource
+  }
+
+  private def clusterAvailableCapacity(node: JsonNode): Double = {
+    val max = parseValue(node.get("capacity")).toDouble
+    val used = parseValue(node.get("usedCapacity")).toDouble
+    val capacity = (max - used) / 100
+    logInfo(s"Cluster available capacity: $capacity.")
+    capacity
+  }
+
+  private def queueCapacity(node: JsonNode): (Double, Double) = {
+    val max = parseValue(node.get("absoluteMaxCapacity")).toDouble
+    val used = parseValue(node.get("absoluteUsedCapacity")).toDouble
+    val available = (max - used) / 100
+    logInfo(s"Queue available capacity: $available.")
+    (available, max / 100)
+  }
+
+  private def calTotalResource(node: JsonNode): ResourceInfo = {
+    val usedMemory = parseValue(node.get("resourcesUsed").get("memory")).toInt
+    if (usedMemory != 0) {
+      val usedCapacity = parseValue(node.get("absoluteUsedCapacity")).toDouble 
/ 100
+      val resource = ResourceInfo(Math.floor(usedMemory / usedCapacity).toInt, 
Int.MaxValue)
+      logInfo(s"Estimate total cluster resource is $resource.")
+      resource
+    } else {
+      logInfo("Current queue used memory is 0, seem available resource as 
infinite.")
+      ResourceInfo(Int.MaxValue, Int.MaxValue)
+    }
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
new file mode 100644
index 0000000000..f2c9125191
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster.parser
+
+import java.util.{List => JList}
+
+import com.fasterxml.jackson.databind.JsonNode
+import org.apache.kylin.cluster.{AvailableResource, ResourceInfo}
+
+import scala.collection.JavaConverters._
+
+class FairSchedulerParser extends SchedulerParser {
+
+  override def availableResource(queueName: String): AvailableResource = {
+    val cluster = clusterAvailableResource()
+    val queue = queueAvailableResource(queueName)
+    val resource = 
AvailableResource(cluster.available.reduceMin(queue.available), queue.max)
+    logInfo(s"Current actual available resource: $resource.")
+    resource
+  }
+
+  private def queueAvailableResource(queueName: String): AvailableResource = {
+    val queues: JList[JsonNode] = root.findParents("queueName")
+    val nodes = queues.asScala.filter(queue => 
parseValue(queue.get("queueName")).equals(queueName))
+    require(nodes.size == 1)
+    val resource = calAvailableResource(nodes.head)
+    logInfo(s"Queue available resource: $resource.")
+    resource
+  }
+
+  private def clusterAvailableResource(): AvailableResource = {
+    val node = root.findValue("rootQueue")
+    require(node != null)
+    val resource = calAvailableResource(node)
+    logInfo(s"Cluster available resource: $resource.")
+    resource
+  }
+
+  private def calAvailableResource(node: JsonNode): AvailableResource = {
+    val maxResources = node.get("maxResources")
+    val maxMemory = parseValue(maxResources.get("memory")).toInt
+    val maxCores = parseValue(maxResources.get("vCores")).toInt
+    val usedResources = node.get("usedResources")
+    val usedMemory = parseValue(usedResources.get("memory")).toInt
+    val usedCores = parseValue(usedResources.get("vCores")).toInt
+    AvailableResource(ResourceInfo(maxMemory - usedMemory, maxCores - 
usedCores), ResourceInfo(maxMemory, maxCores))
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
new file mode 100644
index 0000000000..9db043abc7
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster.parser
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.kylin.cluster.AvailableResource
+import org.apache.spark.internal.Logging
+
+trait SchedulerParser extends Logging {
+  protected var root: JsonNode = _
+  protected lazy val mapper = new ObjectMapper
+
+  def availableResource(queueName: String): AvailableResource
+
+  def parse(schedulerInfo: String): Unit = {
+    this.root = mapper.readTree(schedulerInfo)
+  }
+
+  // value in some scheduler info format to "value"
+  protected def parseValue(node: JsonNode): String = {
+    if (node.toString.startsWith("\"")) {
+      node.toString.replace("\"", "")
+    } else {
+      node.toString
+    }
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
new file mode 100644
index 0000000000..315ae3155d
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cluster.parser
+
+import org.apache.kylin.common.util.JsonUtil
+import org.apache.spark.internal.Logging
+
+object SchedulerParserFactory extends Logging{
+  def create(info: String): SchedulerParser = {
+    try {
+      val schedulerType = 
JsonUtil.readValueAsTree(info).findValue("type").toString
+      // call contains rather than equals cause of value is surrounded with ", 
for example: {"type":"fairScheduler"}
+      // do not support FifoScheduler for now
+      if (schedulerType.contains("capacityScheduler")) {
+        new CapacitySchedulerParser
+      } else if (schedulerType.contains("fairScheduler")) {
+        new FairSchedulerParser
+      } else {
+        throw new IllegalArgumentException(s"Unsupported scheduler type from 
scheduler info. $schedulerType")
+      }
+    } catch {
+      case throwable: Throwable =>
+        logError(s"Invalid scheduler info. $info")
+        throw throwable
+    }
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
new file mode 100644
index 0000000000..d9d80f4298
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.scheduler
+
+import org.apache.kylin.common.KapConfig
+import org.apache.kylin.engine.spark.job.KylinBuildEnv
+import org.apache.kylin.engine.spark.utils.ThreadUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
+
+
+class ClusterMonitor extends Logging {
+  private lazy val scheduler = //
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("connect-master-guard")
+  private val JOB_STEP_PREFIX = "job_step_"
+
+  def scheduleAtFixedRate(func: () => Unit, period: Long): Unit = {
+    scheduler.scheduleAtFixedRate(new Runnable {
+      override def run(): Unit = func.apply()
+    }, period, period, TimeUnit.SECONDS)
+  }
+
+  def monitorSparkMaster(atomicBuildEnv: AtomicReference[KylinBuildEnv], 
atomicSparkSession: AtomicReference[SparkSession],
+                         disconnectTimes: AtomicLong, 
atomicUnreachableSparkMaster: AtomicBoolean): Unit = {
+    val config = atomicBuildEnv.get().kylinConfig
+    if (KapConfig.wrap(config).isCloud && !config.isUTEnv) {
+      val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes
+      if (disconnectMaxTimes >= 0) {
+        val connectPeriod = config.getClusterManagerHealCheckIntervalSecond
+        logDebug(s"ClusterMonitor thread start with max times is 
$disconnectMaxTimes period is $connectPeriod")
+        scheduleAtFixedRate(() => {
+          monitor(atomicBuildEnv, atomicSparkSession, disconnectTimes, 
atomicUnreachableSparkMaster)
+        }, connectPeriod)
+      }
+    }
+  }
+
+  def monitor(atomicBuildEnv: AtomicReference[KylinBuildEnv], 
atomicSparkSession: AtomicReference[SparkSession],
+              disconnectTimes: AtomicLong, atomicUnreachableSparkMaster: 
AtomicBoolean): Unit = {
+    logDebug("monitor start")
+    val config = atomicBuildEnv.get().kylinConfig
+    val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes
+    try {
+      val clusterManager = atomicBuildEnv.get.clusterManager
+      clusterManager.applicationExisted(JOB_STEP_PREFIX + 
atomicBuildEnv.get().buildJobInfos.getJobStepId)
+      disconnectTimes.set(0)
+      atomicUnreachableSparkMaster.set(false)
+      logDebug("monitor stop")
+    } catch {
+      case e: Exception =>
+        logError(s"monitor error with : ${e.getMessage}")
+        if (disconnectTimes.incrementAndGet() >= disconnectMaxTimes && 
atomicSparkSession.get() != null) {
+          logDebug(s"Job will stop: Unable connect spark master to reach 
timeout maximum time")
+          atomicUnreachableSparkMaster.set(true)
+          atomicSparkSession.get().stop()
+        }
+    }
+  }
+
+  def shutdown(): Unit = {
+    scheduler.shutdownNow()
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
new file mode 100644
index 0000000000..ee72d16e51
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.scheduler
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.kylin.engine.spark.utils.ThreadUtils
+
+class JobRuntime(val maxThreadCount: Int) {
+
+  private lazy val minThreads = 1
+  private lazy val maxThreads = Math.max(minThreads, maxThreadCount)
+  // Maybe we should parameterize nThreads.
+  private lazy val threadPool = //
+    ThreadUtils.newDaemonScalableThreadPool("build-thread", //
+      minThreads, maxThreads, 20, TimeUnit.SECONDS)
+
+  // Drain layout result using single thread.
+  private lazy val scheduler = //
+    ThreadUtils.newDaemonSingleThreadScheduledExecutor("build-scheduler")
+
+  def submit(fun: () => Unit): Unit = {
+    threadPool.submit(new Runnable {
+      override def run(): Unit = fun.apply()
+    })
+  }
+
+  def scheduleCheckpoint(fun: () => Unit): Unit = {
+    scheduler.scheduleWithFixedDelay(() => fun.apply(), 30L, 30L, 
TimeUnit.SECONDS)
+  }
+
+  def schedule(func: () => Unit, delay: Long, unit: TimeUnit): Unit = {
+    scheduler.schedule(new Runnable {
+      override def run(): Unit = func.apply()
+    }, delay, unit)
+  }
+
+  def shutdown(): Unit = {
+    scheduler.shutdownNow()
+    threadPool.shutdownNow()
+  }
+
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
new file mode 100644
index 0000000000..d0de86c938
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.scheduler
+
+sealed trait KylinJobEvent
+
+sealed trait JobStatus extends KylinJobEvent
+
+case class JobSucceeded() extends JobStatus
+
+case class JobFailed(reason: String, throwable: Throwable) extends JobStatus
+
+sealed trait JobEndReason extends KylinJobEvent
+
+sealed trait JobFailedReason extends JobEndReason
+
+case class ResourceLack(throwable: Throwable) extends JobFailedReason
+
+case class ExceedMaxRetry(throwable: Throwable) extends JobFailedReason
+
+case class UnknownThrowable(throwable: Throwable) extends JobFailedReason
+
+
+sealed trait JobCommand extends KylinJobEvent
+
+case class RunJob() extends JobCommand
+
+
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
new file mode 100644
index 0000000000..686a183c32
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.scheduler
+
+trait KylinJobListener {
+  def onReceive(event: KylinJobEvent): Unit
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
new file mode 100644
index 0000000000..98a5d7e5da
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.application
+
+import org.apache.kylin.engine.spark.job.KylinBuildEnv
+import org.apache.kylin.engine.spark.scheduler._
+import io.netty.util.internal.ThrowableUtil
+import org.apache.kylin.common.util.Unsafe
+import org.apache.spark.autoheal.ExceptionTerminator
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.KylinJobEventLoop
+
+class JobMonitor(eventLoop: KylinJobEventLoop) extends Logging {
+  var retryTimes = 0
+  eventLoop.registerListener(new KylinJobListener {
+    override def onReceive(event: KylinJobEvent): Unit = {
+      event match {
+        case rl: ResourceLack => handleResourceLack(rl)
+        case ut: UnknownThrowable => handleUnknownThrowable(ut)
+        case emr: ExceedMaxRetry => handleExceedMaxRetry(emr)
+        case _ =>
+      }
+    }
+  })
+
+  def stop(): Unit = {
+  }
+
+  def handleResourceLack(rl: ResourceLack): Unit = {
+    try {
+      if (rl.throwable != null && rl.throwable.getCause != null) {
+        if (rl.throwable.getCause.isInstanceOf[NoRetryException]) {
+          eventLoop.post(JobFailed(rl.throwable.getCause.getMessage, 
rl.throwable.getCause))
+          return
+        }
+      }
+
+      logInfo(s"handleResourceLack --> ${rl.throwable.getCause}")
+      val buildEnv = KylinBuildEnv.get()
+      // if killed, job failed without retry
+      val jobStepId = buildEnv.buildJobInfos.getJobStepId
+      if (buildEnv.clusterManager.isApplicationBeenKilled(jobStepId)) {
+        eventLoop.post(JobFailed(s"Submitted application $jobStepId has been 
killed.", rl.throwable))
+        return
+      }
+      retryTimes += 1
+      KylinBuildEnv.get().buildJobInfos.recordRetryTimes(retryTimes)
+      val maxRetry = buildEnv.kylinConfig.getSparkEngineMaxRetryTime
+      if (retryTimes <= maxRetry) {
+        logError(s"Job failed $retryTimes time(s). Cause:", rl.throwable)
+        Unsafe.setProperty("kylin.spark-conf.auto-prior", "false")
+        ExceptionTerminator.resolveException(rl, eventLoop)
+      } else {
+        eventLoop.post(ExceedMaxRetry(rl.throwable))
+      }
+    } catch {
+      case throwable: Throwable => eventLoop.post(JobFailed("Error occurred 
when generate retry configuration.", throwable))
+    }
+  }
+
+
+  def handleExceedMaxRetry(emr: ExceedMaxRetry): Unit = {
+    eventLoop.post(JobFailed("Retry times exceed MaxRetry set in the 
KylinConfig.", emr.throwable))
+  }
+
+  def handleUnknownThrowable(ur: UnknownThrowable): Unit = {
+    eventLoop.post(JobFailed("Unknown error occurred during the job.", 
ur.throwable))
+  }
+}
+
+case class RetryInfo(overrideConf: java.util.Map[String, String], throwable: 
Throwable) {
+  override def toString: String = {
+    s"""RetryInfo{
+       |    overrideConf : $overrideConf,
+       |    throwable : ${ThrowableUtil.stackTraceToString(throwable)}
+       |}""".stripMargin
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
new file mode 100644
index 0000000000..1837b92fa0
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.application
+
+import io.kyligence.kap.engine.spark.job.ParamsConstants
+import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.kylin.common.util.{JsonUtil, Unsafe}
+import org.apache.kylin.engine.spark.application.SparkApplication
+import org.apache.kylin.engine.spark.job.KylinBuildEnv
+import org.apache.kylin.engine.spark.scheduler._
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.KylinJobEventLoop
+
+import java.util
+import java.util.concurrent.CountDownLatch
+
+/**
+ * Spark driver part, construct the real spark job [SparkApplication]
+ */
+object JobWorkSpace extends Logging {
+  def execute(args: Array[String]): Unit = {
+    try {
+      val (application, appArgs) = resolveArgs(args)
+      val eventLoop = new KylinJobEventLoop
+      val worker = new JobWorker(application, appArgs, eventLoop)
+      val monitor = new JobMonitor(eventLoop)
+      val workspace = new JobWorkSpace(eventLoop, monitor, worker)
+      val statusCode = workspace.run()
+      if (statusCode != 0) {
+        Unsafe.systemExit(statusCode)
+      }
+    } catch {
+      case throwable: Throwable =>
+        logError("Error occurred when init job workspace.", throwable)
+        Unsafe.systemExit(1)
+    }
+  }
+
+  def resolveArgs(args: Array[String]): (SparkApplication, Array[String]) = {
+    if (args.length < 2 || args(0) != "-className") throw new 
IllegalArgumentException("className is required")
+    val className = args(1)
+    // scalastyle:off
+    val o = Class.forName(className).newInstance
+    // scalastyle:on
+    if (!o.isInstanceOf[SparkApplication]) throw new 
IllegalArgumentException(className + " is not a subClass of 
AbstractApplication")
+    val appArgs = args.slice(2, args.length)
+    val application = o.asInstanceOf[SparkApplication]
+    (application, appArgs)
+  }
+}
+
+class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker: 
JobWorker) extends Logging {
+  require(eventLoop != null)
+  require(monitor != null)
+  require(worker != null)
+
+  private var statusCode: Int = 0
+  private val latch = new CountDownLatch(1)
+
+  eventLoop.registerListener(new KylinJobListener {
+    override def onReceive(event: KylinJobEvent): Unit = {
+      event match {
+        case _: JobSucceeded => success()
+        case jf: JobFailed => fail(jf)
+        case _ =>
+      }
+    }
+  })
+
+  def run(): Int = {
+    eventLoop.start()
+    eventLoop.post(RunJob())
+    latch.await()
+    statusCode
+  }
+
+  def success(): Unit = {
+    try {
+      stop()
+    } finally {
+      statusCode = 0
+      latch.countDown()
+    }
+  }
+
+  def fail(jf: JobFailed): Unit = {
+    try {
+      logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable)
+      KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new 
util.HashMap, jf.throwable))
+      updateJobErrorInfo(jf)
+      stop()
+    } finally {
+      statusCode = 1
+      latch.countDown()
+    }
+  }
+
+  def stop(): Unit = {
+    monitor.stop()
+    worker.stop()
+    eventLoop.stop()
+  }
+
+  def updateJobErrorInfo(jf: JobFailed): Unit = {
+    val infos = KylinBuildEnv.get().buildJobInfos
+    val context = worker.getApplication
+
+    val project = context.getProject
+    val jobId = context.getJobId
+
+    val stageId = infos.getStageId
+    val jobStepId = StringUtils.replace(infos.getJobStepId, 
SparkApplication.JOB_NAME_PREFIX, "")
+    val failedStepId = if (StringUtils.isBlank(stageId)) jobStepId else stageId
+
+    val failedSegmentId = infos.getSegmentId
+    val failedStack = ExceptionUtils.getStackTrace(jf.throwable)
+    val failedReason =
+      if (context.getAtomicUnreachableSparkMaster.get()) "Unable connect spark 
master to reach timeout maximum time"
+      else jf.reason
+    val url = "/kylin/api/jobs/error"
+
+    val payload: util.HashMap[String, Object] = new util.HashMap[String, 
Object](5)
+    payload.put("project", project)
+    payload.put("job_id", jobId)
+    payload.put("failed_step_id", failedStepId)
+    payload.put("failed_segment_id", failedSegmentId)
+    payload.put("failed_stack", failedStack)
+    payload.put("failed_reason", failedReason)
+    val json = JsonUtil.writeValueAsString(payload)
+    val params = new util.HashMap[String, String]()
+    val config = KylinBuildEnv.get().kylinConfig
+    params.put(ParamsConstants.TIME_OUT, 
config.getUpdateJobInfoTimeout.toString)
+    params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true))
+    context.getReport.updateSparkJobInfo(params, url, json);
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
new file mode 100644
index 0000000000..069ac171cc
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.application
+
+import java.util.concurrent.Executors
+
+import org.apache.kylin.engine.spark.application.SparkApplication
+import org.apache.kylin.engine.spark.scheduler._
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.KylinJobEventLoop
+
+class JobWorker(application: SparkApplication, args: Array[String], eventLoop: 
KylinJobEventLoop) extends Logging {
+  private val pool = Executors.newSingleThreadExecutor()
+
+  def getApplication: SparkApplication = application
+
+  eventLoop.registerListener(new KylinJobListener {
+    override def onReceive(event: KylinJobEvent): Unit = {
+      event match {
+        case _: RunJob => runJob()
+        case _ =>
+      }
+    }
+  })
+
+  def stop(): Unit = {
+    pool.shutdownNow()
+    application.logJobInfo()
+  }
+
+  private def runJob(): Unit = {
+    execute()
+  }
+
+
+  private def execute(): Unit = {
+    pool.execute(new Runnable {
+      override def run(): Unit = {
+        try {
+          application.execute(args)
+          eventLoop.post(JobSucceeded())
+        } catch {
+          case exception: NoRetryException => 
eventLoop.post(UnknownThrowable(exception))
+          case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
+        }
+      }
+    })
+  }
+}
+
+class NoRetryException(msg: String) extends java.lang.Exception(msg) {
+  def this() {
+    this(null)
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
new file mode 100644
index 0000000000..5cefd24054
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.util
+
+import org.apache.kylin.cluster.{AvailableResource, IClusterManager, 
ResourceInfo}
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.deploy.DeployMessages.{KillApplication, 
MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.master.StandaloneClusterManager.masterEndpoints
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Utils
+import org.apache.spark.{SecurityManager, SparkConf}
+
+// scalastyle:off
+class StandaloneClusterManager extends IClusterManager with Logging {
+
+  private val JOB_STEP_PREFIX = "job_step_"
+
+  override def fetchMaximumResourceAllocation: ResourceInfo = {
+    val state = 
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
+    val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE)
+    val availableMem = aliveWorkers.map(_.memoryFree).sum
+    val availableCores = aliveWorkers.map(_.coresFree).sum
+    logInfo(s"Get available resource, " +
+      s"availableMem: $availableMem, availableCores: $availableCores")
+    ResourceInfo(availableMem, availableCores)
+  }
+
+  override def fetchQueueAvailableResource(queueName: String): 
AvailableResource = {
+    val state = 
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
+    val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE)
+    val availableMem = aliveWorkers.map(_.memoryFree).sum
+    val availableCores = aliveWorkers.map(_.coresFree).sum
+    val totalMem = aliveWorkers.map(_.memory).sum
+    val totalCores = aliveWorkers.map(_.cores).sum
+    logInfo(s"Get available resource, " +
+      s"availableMem: $availableMem, availableCores: $availableCores, " +
+      s"totalMem: $totalMem, totalCores: $totalCores")
+    AvailableResource(ResourceInfo(availableMem, availableCores), 
ResourceInfo(totalMem, totalCores))
+  }
+
+  override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
+    val applicationId = sparkSession.sparkContext.applicationId
+    logInfo(s"Get tracking url of application $applicationId")
+    val state = 
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
+    val app = state.activeApps.find(_.id == applicationId).orNull
+    if (app == null) {
+      logInfo(s"No active application found of applicationId $applicationId")
+      return null
+    }
+    app.desc.appUiUrl
+  }
+
+  override def killApplication(jobStepId: String): Unit = {
+    killApplication(s"$JOB_STEP_PREFIX", jobStepId)
+  }
+
+  override def killApplication(jobStepPrefix: String, jobStepId: String): Unit 
= {
+    val master = masterEndpoints(0)
+    val state = master.askSync[MasterStateResponse](RequestMasterState)
+    val app = state.activeApps.find(_.desc.name.equals(jobStepPrefix + 
jobStepId)).orNull
+    if (app == null) {
+      logInfo(s"No active application found of jobStepId $jobStepId")
+      return
+    }
+    logInfo(s"Kill application ${app.id} by jobStepId $jobStepId")
+    master.send(KillApplication(app.id))
+  }
+
+  override def getRunningJobs(queues: util.Set[String]): util.List[String] = {
+    val state = 
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
+    val jobStepNames = state.activeApps.map(_.desc.name)
+    logInfo(s"Get running jobs ${jobStepNames.toSeq}")
+    import scala.collection.JavaConverters._
+    jobStepNames.toList.asJava
+  }
+
+  override def fetchQueueStatistics(queueName: String): ResourceInfo = {
+    fetchMaximumResourceAllocation
+  }
+
+  override def isApplicationBeenKilled(jobStepId: String): Boolean = {
+    val master = masterEndpoints(0)
+    val state = master.askSync[MasterStateResponse](RequestMasterState)
+    val app = 
state.completedApps.find(_.desc.name.equals(s"$JOB_STEP_PREFIX$jobStepId")).orNull
+    if (app == null) {
+      false
+    } else {
+      "KILLED".equals(app.state.toString)
+    }
+  }
+
+  override def applicationExisted(jobId: String): Boolean = {
+    val master = masterEndpoints(0)
+    val state = master.askSync[MasterStateResponse](RequestMasterState)
+    val app = state.activeApps.find(_.desc.name.equalsIgnoreCase(jobId)).orNull
+    if (app == null) {
+      false
+    } else {
+      true
+    }
+  }
+
+}
+
+object StandaloneClusterManager extends Logging {
+
+  private val ENDPOINT_NAME = "Master"
+  private val CLIENT_NAME = "kylinStandaloneClient"
+  private val SPARK_LOCAL = "local"
+  private val SPARK_MASTER = "spark.master"
+  private val SPARK_RPC_TIMEOUT = "spark.rpc.askTimeout"
+  private val SPARK_AUTHENTICATE = "spark.authenticate"
+  private val SPARK_AUTHENTICATE_SECRET = "spark.authenticate.secret"
+  private val SPARK_NETWORK_CRYPTO_ENABLED = "spark.network.crypto.enabled"
+
+  private lazy val masterEndpoints = {
+    val overrideConfig = KylinConfig.getInstanceFromEnv.getSparkConfigOverride
+    val conf = new SparkConf()
+    if (!conf.contains(SPARK_MASTER) || 
conf.get(SPARK_MASTER).startsWith(SPARK_LOCAL)) {
+      conf.set(SPARK_MASTER, overrideConfig.get(SPARK_MASTER))
+    }
+    if (!conf.contains(SPARK_RPC_TIMEOUT)) {
+      conf.set(SPARK_RPC_TIMEOUT, "10s")
+    }
+    if (overrideConfig.containsKey(SPARK_AUTHENTICATE) && 
"true".equals(overrideConfig.get(SPARK_AUTHENTICATE))) {
+      conf.set(SPARK_AUTHENTICATE, "true")
+      conf.set(SPARK_AUTHENTICATE_SECRET, 
overrideConfig.getOrDefault(SPARK_AUTHENTICATE_SECRET, "kylin"))
+      conf.set(SPARK_NETWORK_CRYPTO_ENABLED, 
overrideConfig.getOrDefault(SPARK_NETWORK_CRYPTO_ENABLED, "true"))
+    }
+    logInfo(s"Spark master ${conf.get(SPARK_MASTER)}")
+    val rpcEnv = RpcEnv.create(CLIENT_NAME, Utils.localHostName(), 0, conf, 
new SecurityManager(conf))
+    val masterUrls = conf.get(SPARK_MASTER)
+    Utils.parseStandaloneMasterUrls(masterUrls)
+      .map(RpcAddress.fromSparkURL)
+      .map(rpcEnv.setupEndpointRef(_, ENDPOINT_NAME))
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
new file mode 100644
index 0000000000..38f1554a01
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util
+import java.util.{List => JList}
+
+import org.apache.kylin.engine.spark.scheduler.{KylinJobEvent, 
KylinJobListener}
+import org.apache.spark.util.EventLoop
+
+import scala.collection.JavaConverters._
+
+class KylinJobEventLoop extends 
EventLoop[KylinJobEvent]("spark-entry-event-loop") {
+  // register listener in single thread, so LinkedList is enough
+  private var listeners: JList[KylinJobListener] = new 
util.LinkedList[KylinJobListener]()
+
+  def registerListener(listener: KylinJobListener): Boolean = {
+    listeners.add(listener)
+  }
+
+  def unregisterListener(listener: KylinJobListener): Boolean = {
+    listeners.remove(listener)
+  }
+
+  override protected def onReceive(event: KylinJobEvent): Unit = {
+    listeners.asScala.foreach(_.onReceive(event))
+  }
+
+  override protected def onError(e: Throwable): Unit = {
+
+  }
+}

Reply via email to