This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new aa462c78fe KYLIN-5217 Create a initial commit
aa462c78fe is described below

commit aa462c78fec33127427ce3246fe5500e9af7077d
Author: longfei.jiang <longfei.ji...@kyligence.io>
AuthorDate: Tue Aug 9 14:05:41 2022 +0800

    KYLIN-5217 Create a initial commit
---
 .../kylin/cluster/ClusterManagerFactory.scala      |  37 ----
 .../apache/kylin/cluster/K8sClusterManager.scala   |  65 -------
 .../kylin/cluster/SchedulerInfoCmdHelper.scala     | 167 -----------------
 .../kylin/cluster/StandaloneClusterManager.scala   |  22 ---
 .../apache/kylin/cluster/YarnClusterManager.scala  | 206 ---------------------
 .../cluster/parser/CapacitySchedulerParser.scala   |  92 ---------
 .../kylin/cluster/parser/FairSchedulerParser.scala |  64 -------
 .../kylin/cluster/parser/SchedulerParser.scala     |  43 -----
 .../cluster/parser/SchedulerParserFactory.scala    |  43 -----
 .../engine/spark/scheduler/ClusterMonitor.scala    |  82 --------
 .../kylin/engine/spark/scheduler/JobRuntime.scala  |  59 ------
 .../engine/spark/scheduler/KylinJobEvent.scala     |  44 -----
 .../engine/spark/scheduler/KylinJobListener.scala  |  23 ---
 .../org/apache/spark/application/JobMonitor.scala  |  94 ----------
 .../apache/spark/application/JobWorkSpace.scala    | 153 ---------------
 .../org/apache/spark/application/JobWorker.scala   |  71 -------
 .../deploy/master/StandaloneClusterManager.scala   | 157 ----------------
 .../apache/spark/scheduler/KylinJobEventLoop.scala |  48 -----
 18 files changed, 1470 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
deleted file mode 100644
index 6009232f57..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster
-
-import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
-import io.kyligence.kap.guava20.shaded.common.util.concurrent.SimpleTimeLimiter
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.util.KylinReflectUtils
-
-
-object ClusterManagerFactory {
-
-  val pool: ExecutorService = Executors.newCachedThreadPool
-
-  def create(kylinConfig: KylinConfig): IClusterManager = {
-    val timeLimiter = SimpleTimeLimiter.create(pool)
-    val target = 
KylinReflectUtils.createObject(kylinConfig.getClusterManagerClassName)._1.asInstanceOf[IClusterManager]
-
-    timeLimiter.newProxy(target, classOf[IClusterManager], 
kylinConfig.getClusterManagerTimeoutThreshold, TimeUnit.SECONDS);
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
deleted file mode 100644
index 8711622885..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.cluster
-
-import java.util
-
-import com.google.common.collect.Lists
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-
-class K8sClusterManager extends IClusterManager with Logging {
-  override def fetchMaximumResourceAllocation: ResourceInfo = {
-    ResourceInfo(Int.MaxValue, 1000)
-  }
-
-  override def fetchQueueAvailableResource(queueName: String): 
AvailableResource = {
-    AvailableResource(ResourceInfo(Int.MaxValue, 1000), 
ResourceInfo(Int.MaxValue, 1000))
-  }
-
-  override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
-    logInfo("Get Build Tracking Url!")
-    val applicationId = sparkSession.sparkContext.applicationId
-    ""
-  }
-
-  override def killApplication(jobStepId: String): Unit = {
-    logInfo("Kill Application $jobStepId !")
-  }
-
-  override def killApplication(jobStepPrefix: String, jobStepId: String): Unit 
= {
-    logInfo("Kill Application $jobStepPrefix $jobStepId !")
-  }
-
-  override def isApplicationBeenKilled(applicationId: String): Boolean = {
-    true
-  }
-
-  override def getRunningJobs(queues: util.Set[String]): util.List[String] = {
-    Lists.newArrayList()
-  }
-
-  override def fetchQueueStatistics(queueName: String): ResourceInfo = {
-    ResourceInfo(Int.MaxValue, 1000)
-  }
-
-  override def applicationExisted(jobId: String): Boolean = {
-    false
-  }
-
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
deleted file mode 100644
index 95c35a8a75..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster
-
-import org.apache.kylin.engine.spark.utils.StorageUtils
-import io.netty.util.internal.ThrowableUtil
-import org.apache.hadoop.yarn.conf.{HAUtil, YarnConfiguration}
-import org.apache.kylin.common.util.{JsonUtil, ShellException}
-import org.apache.spark.internal.Logging
-
-import java.io.{BufferedReader, InputStreamReader}
-import java.nio.charset.StandardCharsets
-
-object SchedulerInfoCmdHelper extends Logging {
-  private val useHttps: Boolean = 
YarnConfiguration.useHttps(StorageUtils.getCurrentYarnConfiguration)
-
-  def schedulerInfo: String = {
-    val cmds = getSocketAddress.map(address => genCmd(address._1, address._2))
-    getInfoByCmds(cmds)
-  }
-
-  def metricsInfo: String = {
-    val cmds = getSocketAddress.map(address => genMetricsCmd(address._1, 
address._2))
-    getInfoByCmds(cmds)
-  }
-
-  private[cluster] def getInfoByCmds(cmds: Iterable[String]): String = {
-    val results = cmds.map { cmd =>
-      try {
-        logInfo(s"Executing the command: ${cmd}")
-        execute(cmd)
-      } catch {
-        case throwable: Throwable => (1, 
ThrowableUtil.stackTraceToString(throwable))
-      }
-    }
-    val tuples = results.filter(result => result._1 == 0 && 
JsonUtil.isJson(result._2))
-    if (tuples.isEmpty) {
-      val errors = tuples.map(_._2).mkString("\n")
-      logWarning(s"Error occurred when get scheduler info from cmd $cmds")
-      throw new RuntimeException(errors)
-    } else {
-      require(tuples.size == 1)
-      tuples.head._2
-    }
-  }
-
-  private[cluster] def getSocketAddress: Map[String, Int] = {
-    val conf = StorageUtils.getCurrentYarnConfiguration
-    val addresses = if (HAUtil.isHAEnabled(conf)) {
-      val haIds = HAUtil.getRMHAIds(conf).toArray
-      require(haIds.nonEmpty, "Ha ids is empty, please check your 
yarn-site.xml.")
-      if (useHttps) {
-        haIds.map(id => 
conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS}.$id",
-          YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, 
YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT))
-      } else {
-        haIds.map(id => 
conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_ADDRESS}.$id",
-          YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, 
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT))
-      }
-    } else {
-      if (useHttps) {
-        Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
-          YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, 
YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT))
-      } else {
-        Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS,
-          YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, 
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT))
-      }
-    }
-    addresses.map(address => address.getHostName -> address.getPort).toMap
-  }
-
-  private[cluster] def genCmd(hostName: String, port: Int): String = {
-    val uri = if (useHttps) {
-      s"https://$hostName:$port/ws/v1/cluster/scheduler";
-    } else {
-      s"http://$hostName:$port/ws/v1/cluster/scheduler";
-    }
-    s"""curl -k --negotiate -u : "$uri""""
-  }
-
-  private[cluster] def genMetricsCmd(hostName: String, port: Int): String = {
-    val uri = if (useHttps) {
-      s"https://$hostName:$port/ws/v1/cluster/metrics";
-    } else {
-      s"http://$hostName:$port/ws/v1/cluster/metrics";
-    }
-    s"""curl -k --negotiate -u : "$uri""""
-  }
-
-  /**
-   * only return std out after execute command
-   *
-   * @param command
-   * @return
-   */
-  private[cluster] def execute(command: String): (Int, String) = {
-    try {
-      val cmd = new Array[String](3)
-      val osName = System.getProperty("os.name")
-      if (osName.startsWith("Windows")) {
-        cmd(0) = "cmd.exe"
-        cmd(1) = "/C"
-      }
-      else {
-        cmd(0) = "/bin/bash"
-        cmd(1) = "-c"
-      }
-      cmd(2) = command
-      val builder = new ProcessBuilder(cmd: _*)
-      builder.environment().putAll(System.getenv())
-      val proc = builder.start
-      val resultStdout = new StringBuilder
-      val inReader = new BufferedReader(new 
InputStreamReader(proc.getInputStream, StandardCharsets.UTF_8.name()))
-      val newLine = System.getProperty("line.separator")
-      var line: String = inReader.readLine()
-      while (line != null) {
-        resultStdout.append(line).append(newLine)
-        line = inReader.readLine()
-      }
-
-      val stderr = new StringBuilder
-      val errorReader = new BufferedReader(new 
InputStreamReader(proc.getErrorStream, StandardCharsets.UTF_8.name()))
-      line = errorReader.readLine()
-      while (line != null) {
-        stderr.append(line).append(newLine)
-        line = errorReader.readLine()
-      }
-
-      logInfo(s"The corresponding http response for the above command: \n 
${stderr}")
-      try {
-        val exitCode = proc.waitFor
-        if (exitCode != 0) {
-          logError(s"executing command $command; exit code: $exitCode")
-          
logError(s"==========================[stderr]===============================")
-          logError(stderr.toString)
-          
logError(s"==========================[stderr]===============================")
-
-          
logError(s"==========================[stdout]===============================")
-          logError(resultStdout.toString)
-          
logError(s"==========================[stdout]===============================")
-        }
-        (exitCode, resultStdout.toString)
-      } catch {
-        case e: InterruptedException =>
-          Thread.currentThread.interrupt()
-          throw e
-      }
-    } catch {
-      case e: Exception => throw new ShellException(e)
-    }
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
deleted file mode 100644
index c3737a9090..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster
-
-class StandaloneClusterManager extends 
org.apache.spark.deploy.master.StandaloneClusterManager {
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
deleted file mode 100644
index c3f6275a0a..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster
-
-import java.io.IOException
-import java.util
-import java.util.stream.Collectors
-import java.util.{ArrayList, EnumSet, List, Set}
-
-import com.google.common.collect.{Lists, Sets}
-import org.apache.kylin.cluster.parser.SchedulerParserFactory
-import org.apache.kylin.engine.spark.utils.StorageUtils
-import org.apache.commons.collections.CollectionUtils
-import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, 
QueueInfo, YarnApplicationState}
-import org.apache.hadoop.yarn.client.api.YarnClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.exceptions.YarnException
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConverters._
-
-class YarnClusterManager extends IClusterManager with Logging {
-
-  import org.apache.kylin.cluster.YarnClusterManager._
-
-  private lazy val maximumResourceAllocation: ResourceInfo = {
-    withYarnClient(yarnClient => {
-      try {
-        val response = yarnClient.createApplication().getNewApplicationResponse
-        val cap = response.getMaximumResourceCapability
-        val resourceInfo = ResourceInfo(cap.getMemory, cap.getVirtualCores)
-        logInfo(s"Cluster maximum resource allocation $resourceInfo")
-        resourceInfo
-      } catch {
-        case throwable: Throwable =>
-          logError("Error occurred when get resource upper limit per 
container.", throwable)
-          throw throwable
-      }
-    })
-  }
-
-  override def fetchMaximumResourceAllocation: ResourceInfo = 
maximumResourceAllocation
-
-  override def fetchQueueAvailableResource(queueName: String): 
AvailableResource = {
-    val info = SchedulerInfoCmdHelper.schedulerInfo
-    val parser = SchedulerParserFactory.create(info)
-    parser.parse(info)
-    parser.availableResource(queueName)
-  }
-
-  def getYarnApplicationReport(applicationId: String): ApplicationReport = {
-    withYarnClient(yarnClient => {
-      val array = applicationId.split("_")
-      if (array.length < 3) return null
-      val appId = ApplicationId.newInstance(array(1).toLong, array(2).toInt)
-      yarnClient.getApplicationReport(appId)
-    })
-  }
-
-  override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
-    val applicationId = sparkSession.sparkContext.applicationId
-    val applicationReport = getYarnApplicationReport(applicationId)
-    if (null == applicationReport) null
-    else applicationReport.getTrackingUrl
-  }
-
-  def killApplication(jobStepId: String): Unit = {
-    killApplication("job_step_", jobStepId)
-  }
-
-  def killApplication(jobStepPrefix: String, jobStepId: String): Unit = {
-    withYarnClient(yarnClient => {
-      var orphanApplicationId: String = null
-      try {
-        val types: Set[String] = Sets.newHashSet("SPARK")
-        val states: EnumSet[YarnApplicationState] = 
EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
-          YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, 
YarnApplicationState.RUNNING)
-        val applicationReports: List[ApplicationReport] = 
yarnClient.getApplications(types, states)
-        if (CollectionUtils.isEmpty(applicationReports)) return
-        import scala.collection.JavaConverters._
-        for (report <- applicationReports.asScala) {
-          if (report.getName.equalsIgnoreCase(jobStepPrefix + jobStepId)) {
-            orphanApplicationId = report.getApplicationId.toString
-            yarnClient.killApplication(report.getApplicationId)
-            logInfo(s"kill orphan yarn application $orphanApplicationId 
succeed, job step $jobStepId")
-          }
-        }
-      } catch {
-        case ex@(_: YarnException | _: IOException) =>
-          logError(s"kill orphan yarn application $orphanApplicationId failed, 
job step $jobStepId", ex)
-      }
-    })
-  }
-
-  def getRunningJobs(queues: Set[String]): List[String] = {
-    withYarnClient(yarnClient => {
-      if (queues.isEmpty) {
-        val applications = 
yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING))
-        if (null == applications) new ArrayList[String]()
-        else applications.asScala.map(_.getName).asJava
-      } else {
-        val runningJobs: List[String] = new ArrayList[String]()
-        for (queue <- queues.asScala) {
-          val applications = yarnClient.getQueueInfo(queue).getApplications
-          if (null != applications) {
-            applications.asScala.filter(_.getYarnApplicationState == 
YarnApplicationState.RUNNING).map(_.getName).map(runningJobs.add)
-          }
-        }
-        runningJobs
-      }
-    })
-  }
-
-  override def fetchQueueStatistics(queueName: String): ResourceInfo = {
-    withYarnClient(yarnClient => {
-      val qs = yarnClient.getQueueInfo(queueName).getQueueStatistics
-      ResourceInfo(qs.getAvailableMemoryMB.toInt, qs.getAvailableVCores.toInt)
-    })
-  }
-
-  override def isApplicationBeenKilled(jobStepId: String): Boolean = {
-    withYarnClient(yarnClient => {
-      val types: Set[String] = Sets.newHashSet("SPARK")
-      val states: EnumSet[YarnApplicationState] = 
EnumSet.of(YarnApplicationState.KILLED)
-      val applicationReports: List[ApplicationReport] = 
yarnClient.getApplications(types, states)
-      if (!CollectionUtils.isEmpty(applicationReports)) {
-        import scala.collection.JavaConverters._
-        for (report <- applicationReports.asScala) {
-          if (report.getName.equalsIgnoreCase("job_step_" + jobStepId)) {
-            return true
-          }
-        }
-      }
-      false
-    })
-  }
-
-  def applicationExisted(jobId: String): Boolean = {
-    withYarnClient(yarnClient => {
-      val types: util.Set[String] = Sets.newHashSet("SPARK")
-      val states: util.EnumSet[YarnApplicationState] = 
util.EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
-        YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, 
YarnApplicationState.RUNNING)
-      val applicationReports: util.List[ApplicationReport] = 
yarnClient.getApplications(types, states)
-      if (!CollectionUtils.isEmpty(applicationReports)) {
-        import scala.collection.JavaConverters._
-        for (report <- applicationReports.asScala) {
-          if (report.getName.equalsIgnoreCase(jobId)) {
-            return true
-          }
-        }
-      }
-      return false
-    })
-  }
-
-  def listQueueNames(): util.List[String] = {
-    withYarnClient(yarnClient => {
-      val queues: util.List[QueueInfo] = yarnClient.getAllQueues
-      val queueNames: util.List[String] = Lists.newArrayList()
-      for (queue <- queues.asScala) {
-        queueNames.add(queue.getQueueName)
-      }
-      queueNames
-    })
-  }
-
-}
-
-object YarnClusterManager {
-
-  def withYarnClient[T](body: YarnClient => T): T = {
-    val yarnClient = YarnClient.createYarnClient
-    yarnClient.init(getSpecifiedConf)
-    yarnClient.start()
-    try {
-      body(yarnClient)
-    } finally {
-      yarnClient.close()
-    }
-  }
-
-  private def getSpecifiedConf: YarnConfiguration = {
-    // yarn cluster mode couldn't access local hadoop_conf_dir
-    val yarnConf = StorageUtils.getCurrentYarnConfiguration
-    // https://issues.apache.org/jira/browse/SPARK-15343
-    yarnConf.set("yarn.timeline-service.enabled", "false")
-    yarnConf
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
deleted file mode 100644
index e22700c7e7..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster.parser
-
-import java.util.{List => JList}
-import com.fasterxml.jackson.databind.JsonNode
-import org.apache.kylin.cluster.{AvailableResource, ResourceInfo}
-import org.apache.kylin.engine.spark.application.SparkApplication
-import org.apache.kylin.engine.spark.job.KylinBuildEnv
-
-import scala.collection.JavaConverters._
-
-class CapacitySchedulerParser extends SchedulerParser {
-
-  override def availableResource(queueName: String): AvailableResource = {
-    val queues: JList[JsonNode] = root.findParents("queueName")
-    val nodes = queues.asScala.filter(queue => 
parseValue(queue.get("queueName")).equals(queueName))
-    require(nodes.size == 1)
-
-    var (queueAvailable, queueMax) = queueCapacity(nodes.head)
-    val totalResource = calTotalResource(nodes.head)
-    val clusterNode = root.findValue("schedulerInfo")
-    val cluster = clusterAvailableCapacity(clusterNode)
-    var min = Math.min(queueAvailable, cluster)
-    logInfo(s"queueAvailable is ${queueAvailable}, min is ${min}, queueMax is 
${queueMax}")
-    if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && queueMax 
== 0.0) {
-      logInfo("configure yarn queue using dynamic resource plan in capacity 
scheduler")
-      queueMax = 1.0
-    }
-    if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && min == 
0.0) {
-      logInfo("configure yarn queue using dynamic resource plan in capacity 
scheduler")
-      min = 1.0
-    }
-    var resource = AvailableResource(totalResource.percentage(min), 
totalResource.percentage(queueMax))
-    try {
-      val queueAvailableRes = 
KylinBuildEnv.get.clusterManager.fetchQueueStatistics(queueName)
-      resource = AvailableResource(queueAvailableRes, 
totalResource.percentage(queueMax))
-    } catch {
-      case e: Error =>
-        logInfo(s"The current hadoop version does not support 
QueueInfo.getQueueStatistics method.")
-        None
-    }
-
-    logInfo(s"Capacity actual available resource: $resource.")
-    resource
-  }
-
-  private def clusterAvailableCapacity(node: JsonNode): Double = {
-    val max = parseValue(node.get("capacity")).toDouble
-    val used = parseValue(node.get("usedCapacity")).toDouble
-    val capacity = (max - used) / 100
-    logInfo(s"Cluster available capacity: $capacity.")
-    capacity
-  }
-
-  private def queueCapacity(node: JsonNode): (Double, Double) = {
-    val max = parseValue(node.get("absoluteMaxCapacity")).toDouble
-    val used = parseValue(node.get("absoluteUsedCapacity")).toDouble
-    val available = (max - used) / 100
-    logInfo(s"Queue available capacity: $available.")
-    (available, max / 100)
-  }
-
-  private def calTotalResource(node: JsonNode): ResourceInfo = {
-    val usedMemory = parseValue(node.get("resourcesUsed").get("memory")).toInt
-    if (usedMemory != 0) {
-      val usedCapacity = parseValue(node.get("absoluteUsedCapacity")).toDouble 
/ 100
-      val resource = ResourceInfo(Math.floor(usedMemory / usedCapacity).toInt, 
Int.MaxValue)
-      logInfo(s"Estimate total cluster resource is $resource.")
-      resource
-    } else {
-      logInfo("Current queue used memory is 0, seem available resource as 
infinite.")
-      ResourceInfo(Int.MaxValue, Int.MaxValue)
-    }
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
deleted file mode 100644
index f2c9125191..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster.parser
-
-import java.util.{List => JList}
-
-import com.fasterxml.jackson.databind.JsonNode
-import org.apache.kylin.cluster.{AvailableResource, ResourceInfo}
-
-import scala.collection.JavaConverters._
-
-class FairSchedulerParser extends SchedulerParser {
-
-  override def availableResource(queueName: String): AvailableResource = {
-    val cluster = clusterAvailableResource()
-    val queue = queueAvailableResource(queueName)
-    val resource = 
AvailableResource(cluster.available.reduceMin(queue.available), queue.max)
-    logInfo(s"Current actual available resource: $resource.")
-    resource
-  }
-
-  private def queueAvailableResource(queueName: String): AvailableResource = {
-    val queues: JList[JsonNode] = root.findParents("queueName")
-    val nodes = queues.asScala.filter(queue => 
parseValue(queue.get("queueName")).equals(queueName))
-    require(nodes.size == 1)
-    val resource = calAvailableResource(nodes.head)
-    logInfo(s"Queue available resource: $resource.")
-    resource
-  }
-
-  private def clusterAvailableResource(): AvailableResource = {
-    val node = root.findValue("rootQueue")
-    require(node != null)
-    val resource = calAvailableResource(node)
-    logInfo(s"Cluster available resource: $resource.")
-    resource
-  }
-
-  private def calAvailableResource(node: JsonNode): AvailableResource = {
-    val maxResources = node.get("maxResources")
-    val maxMemory = parseValue(maxResources.get("memory")).toInt
-    val maxCores = parseValue(maxResources.get("vCores")).toInt
-    val usedResources = node.get("usedResources")
-    val usedMemory = parseValue(usedResources.get("memory")).toInt
-    val usedCores = parseValue(usedResources.get("vCores")).toInt
-    AvailableResource(ResourceInfo(maxMemory - usedMemory, maxCores - 
usedCores), ResourceInfo(maxMemory, maxCores))
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
deleted file mode 100644
index 9db043abc7..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster.parser
-
-import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
-import org.apache.kylin.cluster.AvailableResource
-import org.apache.spark.internal.Logging
-
-trait SchedulerParser extends Logging {
-  protected var root: JsonNode = _
-  protected lazy val mapper = new ObjectMapper
-
-  def availableResource(queueName: String): AvailableResource
-
-  def parse(schedulerInfo: String): Unit = {
-    this.root = mapper.readTree(schedulerInfo)
-  }
-
-  // value in some scheduler info format to "value"
-  protected def parseValue(node: JsonNode): String = {
-    if (node.toString.startsWith("\"")) {
-      node.toString.replace("\"", "")
-    } else {
-      node.toString
-    }
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
deleted file mode 100644
index 315ae3155d..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster.parser
-
-import org.apache.kylin.common.util.JsonUtil
-import org.apache.spark.internal.Logging
-
-object SchedulerParserFactory extends Logging{
-  def create(info: String): SchedulerParser = {
-    try {
-      val schedulerType = 
JsonUtil.readValueAsTree(info).findValue("type").toString
-      // call contains rather than equals cause of value is surrounded with ", 
for example: {"type":"fairScheduler"}
-      // do not support FifoScheduler for now
-      if (schedulerType.contains("capacityScheduler")) {
-        new CapacitySchedulerParser
-      } else if (schedulerType.contains("fairScheduler")) {
-        new FairSchedulerParser
-      } else {
-        throw new IllegalArgumentException(s"Unsupported scheduler type from 
scheduler info. $schedulerType")
-      }
-    } catch {
-      case throwable: Throwable =>
-        logError(s"Invalid scheduler info. $info")
-        throw throwable
-    }
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
deleted file mode 100644
index d9d80f4298..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.scheduler
-
-import org.apache.kylin.common.KapConfig
-import org.apache.kylin.engine.spark.job.KylinBuildEnv
-import org.apache.kylin.engine.spark.utils.ThreadUtils
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
-
-
-class ClusterMonitor extends Logging {
-  private lazy val scheduler = //
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("connect-master-guard")
-  private val JOB_STEP_PREFIX = "job_step_"
-
-  def scheduleAtFixedRate(func: () => Unit, period: Long): Unit = {
-    scheduler.scheduleAtFixedRate(new Runnable {
-      override def run(): Unit = func.apply()
-    }, period, period, TimeUnit.SECONDS)
-  }
-
-  def monitorSparkMaster(atomicBuildEnv: AtomicReference[KylinBuildEnv], 
atomicSparkSession: AtomicReference[SparkSession],
-                         disconnectTimes: AtomicLong, 
atomicUnreachableSparkMaster: AtomicBoolean): Unit = {
-    val config = atomicBuildEnv.get().kylinConfig
-    if (KapConfig.wrap(config).isCloud && !config.isUTEnv) {
-      val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes
-      if (disconnectMaxTimes >= 0) {
-        val connectPeriod = config.getClusterManagerHealCheckIntervalSecond
-        logDebug(s"ClusterMonitor thread start with max times is 
$disconnectMaxTimes period is $connectPeriod")
-        scheduleAtFixedRate(() => {
-          monitor(atomicBuildEnv, atomicSparkSession, disconnectTimes, 
atomicUnreachableSparkMaster)
-        }, connectPeriod)
-      }
-    }
-  }
-
-  def monitor(atomicBuildEnv: AtomicReference[KylinBuildEnv], 
atomicSparkSession: AtomicReference[SparkSession],
-              disconnectTimes: AtomicLong, atomicUnreachableSparkMaster: 
AtomicBoolean): Unit = {
-    logDebug("monitor start")
-    val config = atomicBuildEnv.get().kylinConfig
-    val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes
-    try {
-      val clusterManager = atomicBuildEnv.get.clusterManager
-      clusterManager.applicationExisted(JOB_STEP_PREFIX + 
atomicBuildEnv.get().buildJobInfos.getJobStepId)
-      disconnectTimes.set(0)
-      atomicUnreachableSparkMaster.set(false)
-      logDebug("monitor stop")
-    } catch {
-      case e: Exception =>
-        logError(s"monitor error with : ${e.getMessage}")
-        if (disconnectTimes.incrementAndGet() >= disconnectMaxTimes && 
atomicSparkSession.get() != null) {
-          logDebug(s"Job will stop: Unable connect spark master to reach 
timeout maximum time")
-          atomicUnreachableSparkMaster.set(true)
-          atomicSparkSession.get().stop()
-        }
-    }
-  }
-
-  def shutdown(): Unit = {
-    scheduler.shutdownNow()
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
deleted file mode 100644
index ee72d16e51..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.scheduler
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.kylin.engine.spark.utils.ThreadUtils
-
-class JobRuntime(val maxThreadCount: Int) {
-
-  private lazy val minThreads = 1
-  private lazy val maxThreads = Math.max(minThreads, maxThreadCount)
-  // Maybe we should parameterize nThreads.
-  private lazy val threadPool = //
-    ThreadUtils.newDaemonScalableThreadPool("build-thread", //
-      minThreads, maxThreads, 20, TimeUnit.SECONDS)
-
-  // Drain layout result using single thread.
-  private lazy val scheduler = //
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("build-scheduler")
-
-  def submit(fun: () => Unit): Unit = {
-    threadPool.submit(new Runnable {
-      override def run(): Unit = fun.apply()
-    })
-  }
-
-  def scheduleCheckpoint(fun: () => Unit): Unit = {
-    scheduler.scheduleWithFixedDelay(() => fun.apply(), 30L, 30L, 
TimeUnit.SECONDS)
-  }
-
-  def schedule(func: () => Unit, delay: Long, unit: TimeUnit): Unit = {
-    scheduler.schedule(new Runnable {
-      override def run(): Unit = func.apply()
-    }, delay, unit)
-  }
-
-  def shutdown(): Unit = {
-    scheduler.shutdownNow()
-    threadPool.shutdownNow()
-  }
-
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
deleted file mode 100644
index d0de86c938..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.scheduler
-
-sealed trait KylinJobEvent
-
-sealed trait JobStatus extends KylinJobEvent
-
-case class JobSucceeded() extends JobStatus
-
-case class JobFailed(reason: String, throwable: Throwable) extends JobStatus
-
-sealed trait JobEndReason extends KylinJobEvent
-
-sealed trait JobFailedReason extends JobEndReason
-
-case class ResourceLack(throwable: Throwable) extends JobFailedReason
-
-case class ExceedMaxRetry(throwable: Throwable) extends JobFailedReason
-
-case class UnknownThrowable(throwable: Throwable) extends JobFailedReason
-
-
-sealed trait JobCommand extends KylinJobEvent
-
-case class RunJob() extends JobCommand
-
-
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
deleted file mode 100644
index 686a183c32..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.scheduler
-
-trait KylinJobListener {
-  def onReceive(event: KylinJobEvent): Unit
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
deleted file mode 100644
index 98a5d7e5da..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.application
-
-import org.apache.kylin.engine.spark.job.KylinBuildEnv
-import org.apache.kylin.engine.spark.scheduler._
-import io.netty.util.internal.ThrowableUtil
-import org.apache.kylin.common.util.Unsafe
-import org.apache.spark.autoheal.ExceptionTerminator
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.KylinJobEventLoop
-
-class JobMonitor(eventLoop: KylinJobEventLoop) extends Logging {
-  var retryTimes = 0
-  eventLoop.registerListener(new KylinJobListener {
-    override def onReceive(event: KylinJobEvent): Unit = {
-      event match {
-        case rl: ResourceLack => handleResourceLack(rl)
-        case ut: UnknownThrowable => handleUnknownThrowable(ut)
-        case emr: ExceedMaxRetry => handleExceedMaxRetry(emr)
-        case _ =>
-      }
-    }
-  })
-
-  def stop(): Unit = {
-  }
-
-  def handleResourceLack(rl: ResourceLack): Unit = {
-    try {
-      if (rl.throwable != null && rl.throwable.getCause != null) {
-        if (rl.throwable.getCause.isInstanceOf[NoRetryException]) {
-          eventLoop.post(JobFailed(rl.throwable.getCause.getMessage, 
rl.throwable.getCause))
-          return
-        }
-      }
-
-      logInfo(s"handleResourceLack --> ${rl.throwable.getCause}")
-      val buildEnv = KylinBuildEnv.get()
-      // if killed, job failed without retry
-      val jobStepId = buildEnv.buildJobInfos.getJobStepId
-      if (buildEnv.clusterManager.isApplicationBeenKilled(jobStepId)) {
-        eventLoop.post(JobFailed(s"Submitted application $jobStepId has been 
killed.", rl.throwable))
-        return
-      }
-      retryTimes += 1
-      KylinBuildEnv.get().buildJobInfos.recordRetryTimes(retryTimes)
-      val maxRetry = buildEnv.kylinConfig.getSparkEngineMaxRetryTime
-      if (retryTimes <= maxRetry) {
-        logError(s"Job failed $retryTimes time(s). Cause:", rl.throwable)
-        Unsafe.setProperty("kylin.spark-conf.auto-prior", "false")
-        ExceptionTerminator.resolveException(rl, eventLoop)
-      } else {
-        eventLoop.post(ExceedMaxRetry(rl.throwable))
-      }
-    } catch {
-      case throwable: Throwable => eventLoop.post(JobFailed("Error occurred 
when generate retry configuration.", throwable))
-    }
-  }
-
-
-  def handleExceedMaxRetry(emr: ExceedMaxRetry): Unit = {
-    eventLoop.post(JobFailed("Retry times exceed MaxRetry set in the 
KylinConfig.", emr.throwable))
-  }
-
-  def handleUnknownThrowable(ur: UnknownThrowable): Unit = {
-    eventLoop.post(JobFailed("Unknown error occurred during the job.", 
ur.throwable))
-  }
-}
-
-case class RetryInfo(overrideConf: java.util.Map[String, String], throwable: 
Throwable) {
-  override def toString: String = {
-    s"""RetryInfo{
-       |    overrideConf : $overrideConf,
-       |    throwable : ${ThrowableUtil.stackTraceToString(throwable)}
-       |}""".stripMargin
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
deleted file mode 100644
index 1837b92fa0..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.application
-
-import io.kyligence.kap.engine.spark.job.ParamsConstants
-import org.apache.commons.lang.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-import org.apache.kylin.common.util.{JsonUtil, Unsafe}
-import org.apache.kylin.engine.spark.application.SparkApplication
-import org.apache.kylin.engine.spark.job.KylinBuildEnv
-import org.apache.kylin.engine.spark.scheduler._
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.KylinJobEventLoop
-
-import java.util
-import java.util.concurrent.CountDownLatch
-
-/**
- * Spark driver part, construct the real spark job [SparkApplication]
- */
-object JobWorkSpace extends Logging {
-  def execute(args: Array[String]): Unit = {
-    try {
-      val (application, appArgs) = resolveArgs(args)
-      val eventLoop = new KylinJobEventLoop
-      val worker = new JobWorker(application, appArgs, eventLoop)
-      val monitor = new JobMonitor(eventLoop)
-      val workspace = new JobWorkSpace(eventLoop, monitor, worker)
-      val statusCode = workspace.run()
-      if (statusCode != 0) {
-        Unsafe.systemExit(statusCode)
-      }
-    } catch {
-      case throwable: Throwable =>
-        logError("Error occurred when init job workspace.", throwable)
-        Unsafe.systemExit(1)
-    }
-  }
-
-  def resolveArgs(args: Array[String]): (SparkApplication, Array[String]) = {
-    if (args.length < 2 || args(0) != "-className") throw new 
IllegalArgumentException("className is required")
-    val className = args(1)
-    // scalastyle:off
-    val o = Class.forName(className).newInstance
-    // scalastyle:on
-    if (!o.isInstanceOf[SparkApplication]) throw new 
IllegalArgumentException(className + " is not a subClass of 
AbstractApplication")
-    val appArgs = args.slice(2, args.length)
-    val application = o.asInstanceOf[SparkApplication]
-    (application, appArgs)
-  }
-}
-
-class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker: 
JobWorker) extends Logging {
-  require(eventLoop != null)
-  require(monitor != null)
-  require(worker != null)
-
-  private var statusCode: Int = 0
-  private val latch = new CountDownLatch(1)
-
-  eventLoop.registerListener(new KylinJobListener {
-    override def onReceive(event: KylinJobEvent): Unit = {
-      event match {
-        case _: JobSucceeded => success()
-        case jf: JobFailed => fail(jf)
-        case _ =>
-      }
-    }
-  })
-
-  def run(): Int = {
-    eventLoop.start()
-    eventLoop.post(RunJob())
-    latch.await()
-    statusCode
-  }
-
-  def success(): Unit = {
-    try {
-      stop()
-    } finally {
-      statusCode = 0
-      latch.countDown()
-    }
-  }
-
-  def fail(jf: JobFailed): Unit = {
-    try {
-      logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable)
-      KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new 
util.HashMap, jf.throwable))
-      updateJobErrorInfo(jf)
-      stop()
-    } finally {
-      statusCode = 1
-      latch.countDown()
-    }
-  }
-
-  def stop(): Unit = {
-    monitor.stop()
-    worker.stop()
-    eventLoop.stop()
-  }
-
-  def updateJobErrorInfo(jf: JobFailed): Unit = {
-    val infos = KylinBuildEnv.get().buildJobInfos
-    val context = worker.getApplication
-
-    val project = context.getProject
-    val jobId = context.getJobId
-
-    val stageId = infos.getStageId
-    val jobStepId = StringUtils.replace(infos.getJobStepId, 
SparkApplication.JOB_NAME_PREFIX, "")
-    val failedStepId = if (StringUtils.isBlank(stageId)) jobStepId else stageId
-
-    val failedSegmentId = infos.getSegmentId
-    val failedStack = ExceptionUtils.getStackTrace(jf.throwable)
-    val failedReason =
-      if (context.getAtomicUnreachableSparkMaster.get()) "Unable connect spark 
master to reach timeout maximum time"
-      else jf.reason
-    val url = "/kylin/api/jobs/error"
-
-    val payload: util.HashMap[String, Object] = new util.HashMap[String, 
Object](5)
-    payload.put("project", project)
-    payload.put("job_id", jobId)
-    payload.put("failed_step_id", failedStepId)
-    payload.put("failed_segment_id", failedSegmentId)
-    payload.put("failed_stack", failedStack)
-    payload.put("failed_reason", failedReason)
-    val json = JsonUtil.writeValueAsString(payload)
-    val params = new util.HashMap[String, String]()
-    val config = KylinBuildEnv.get().kylinConfig
-    params.put(ParamsConstants.TIME_OUT, 
config.getUpdateJobInfoTimeout.toString)
-    params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true))
-    context.getReport.updateSparkJobInfo(params, url, json);
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
deleted file mode 100644
index 069ac171cc..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.application
-
-import java.util.concurrent.Executors
-
-import org.apache.kylin.engine.spark.application.SparkApplication
-import org.apache.kylin.engine.spark.scheduler._
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.KylinJobEventLoop
-
-class JobWorker(application: SparkApplication, args: Array[String], eventLoop: 
KylinJobEventLoop) extends Logging {
-  private val pool = Executors.newSingleThreadExecutor()
-
-  def getApplication: SparkApplication = application
-
-  eventLoop.registerListener(new KylinJobListener {
-    override def onReceive(event: KylinJobEvent): Unit = {
-      event match {
-        case _: RunJob => runJob()
-        case _ =>
-      }
-    }
-  })
-
-  def stop(): Unit = {
-    pool.shutdownNow()
-    application.logJobInfo()
-  }
-
-  private def runJob(): Unit = {
-    execute()
-  }
-
-
-  private def execute(): Unit = {
-    pool.execute(new Runnable {
-      override def run(): Unit = {
-        try {
-          application.execute(args)
-          eventLoop.post(JobSucceeded())
-        } catch {
-          case exception: NoRetryException => 
eventLoop.post(UnknownThrowable(exception))
-          case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
-        }
-      }
-    })
-  }
-}
-
-class NoRetryException(msg: String) extends java.lang.Exception(msg) {
-  def this() {
-    this(null)
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
deleted file mode 100644
index 5cefd24054..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.master
-
-import java.util
-
-import org.apache.kylin.cluster.{AvailableResource, IClusterManager, 
ResourceInfo}
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.deploy.DeployMessages.{KillApplication, 
MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.master.StandaloneClusterManager.masterEndpoints
-import org.apache.spark.internal.Logging
-import org.apache.spark.rpc.{RpcAddress, RpcEnv}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.Utils
-import org.apache.spark.{SecurityManager, SparkConf}
-
-// scalastyle:off
-class StandaloneClusterManager extends IClusterManager with Logging {
-
-  private val JOB_STEP_PREFIX = "job_step_"
-
-  override def fetchMaximumResourceAllocation: ResourceInfo = {
-    val state = 
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
-    val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE)
-    val availableMem = aliveWorkers.map(_.memoryFree).sum
-    val availableCores = aliveWorkers.map(_.coresFree).sum
-    logInfo(s"Get available resource, " +
-      s"availableMem: $availableMem, availableCores: $availableCores")
-    ResourceInfo(availableMem, availableCores)
-  }
-
-  override def fetchQueueAvailableResource(queueName: String): 
AvailableResource = {
-    val state = 
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
-    val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE)
-    val availableMem = aliveWorkers.map(_.memoryFree).sum
-    val availableCores = aliveWorkers.map(_.coresFree).sum
-    val totalMem = aliveWorkers.map(_.memory).sum
-    val totalCores = aliveWorkers.map(_.cores).sum
-    logInfo(s"Get available resource, " +
-      s"availableMem: $availableMem, availableCores: $availableCores, " +
-      s"totalMem: $totalMem, totalCores: $totalCores")
-    AvailableResource(ResourceInfo(availableMem, availableCores), 
ResourceInfo(totalMem, totalCores))
-  }
-
-  override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
-    val applicationId = sparkSession.sparkContext.applicationId
-    logInfo(s"Get tracking url of application $applicationId")
-    val state = 
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
-    val app = state.activeApps.find(_.id == applicationId).orNull
-    if (app == null) {
-      logInfo(s"No active application found of applicationId $applicationId")
-      return null
-    }
-    app.desc.appUiUrl
-  }
-
-  override def killApplication(jobStepId: String): Unit = {
-    killApplication(s"$JOB_STEP_PREFIX", jobStepId)
-  }
-
-  override def killApplication(jobStepPrefix: String, jobStepId: String): Unit 
= {
-    val master = masterEndpoints(0)
-    val state = master.askSync[MasterStateResponse](RequestMasterState)
-    val app = state.activeApps.find(_.desc.name.equals(jobStepPrefix + 
jobStepId)).orNull
-    if (app == null) {
-      logInfo(s"No active application found of jobStepId $jobStepId")
-      return
-    }
-    logInfo(s"Kill application ${app.id} by jobStepId $jobStepId")
-    master.send(KillApplication(app.id))
-  }
-
-  override def getRunningJobs(queues: util.Set[String]): util.List[String] = {
-    val state = 
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
-    val jobStepNames = state.activeApps.map(_.desc.name)
-    logInfo(s"Get running jobs ${jobStepNames.toSeq}")
-    import scala.collection.JavaConverters._
-    jobStepNames.toList.asJava
-  }
-
-  override def fetchQueueStatistics(queueName: String): ResourceInfo = {
-    fetchMaximumResourceAllocation
-  }
-
-  override def isApplicationBeenKilled(jobStepId: String): Boolean = {
-    val master = masterEndpoints(0)
-    val state = master.askSync[MasterStateResponse](RequestMasterState)
-    val app = 
state.completedApps.find(_.desc.name.equals(s"$JOB_STEP_PREFIX$jobStepId")).orNull
-    if (app == null) {
-      false
-    } else {
-      "KILLED".equals(app.state.toString)
-    }
-  }
-
-  override def applicationExisted(jobId: String): Boolean = {
-    val master = masterEndpoints(0)
-    val state = master.askSync[MasterStateResponse](RequestMasterState)
-    val app = state.activeApps.find(_.desc.name.equalsIgnoreCase(jobId)).orNull
-    if (app == null) {
-      false
-    } else {
-      true
-    }
-  }
-
-}
-
-object StandaloneClusterManager extends Logging {
-
-  private val ENDPOINT_NAME = "Master"
-  private val CLIENT_NAME = "kylinStandaloneClient"
-  private val SPARK_LOCAL = "local"
-  private val SPARK_MASTER = "spark.master"
-  private val SPARK_RPC_TIMEOUT = "spark.rpc.askTimeout"
-  private val SPARK_AUTHENTICATE = "spark.authenticate"
-  private val SPARK_AUTHENTICATE_SECRET = "spark.authenticate.secret"
-  private val SPARK_NETWORK_CRYPTO_ENABLED = "spark.network.crypto.enabled"
-
-  private lazy val masterEndpoints = {
-    val overrideConfig = KylinConfig.getInstanceFromEnv.getSparkConfigOverride
-    val conf = new SparkConf()
-    if (!conf.contains(SPARK_MASTER) || 
conf.get(SPARK_MASTER).startsWith(SPARK_LOCAL)) {
-      conf.set(SPARK_MASTER, overrideConfig.get(SPARK_MASTER))
-    }
-    if (!conf.contains(SPARK_RPC_TIMEOUT)) {
-      conf.set(SPARK_RPC_TIMEOUT, "10s")
-    }
-    if (overrideConfig.containsKey(SPARK_AUTHENTICATE) && 
"true".equals(overrideConfig.get(SPARK_AUTHENTICATE))) {
-      conf.set(SPARK_AUTHENTICATE, "true")
-      conf.set(SPARK_AUTHENTICATE_SECRET, 
overrideConfig.getOrDefault(SPARK_AUTHENTICATE_SECRET, "kylin"))
-      conf.set(SPARK_NETWORK_CRYPTO_ENABLED, 
overrideConfig.getOrDefault(SPARK_NETWORK_CRYPTO_ENABLED, "true"))
-    }
-    logInfo(s"Spark master ${conf.get(SPARK_MASTER)}")
-    val rpcEnv = RpcEnv.create(CLIENT_NAME, Utils.localHostName(), 0, conf, 
new SecurityManager(conf))
-    val masterUrls = conf.get(SPARK_MASTER)
-    Utils.parseStandaloneMasterUrls(masterUrls)
-      .map(RpcAddress.fromSparkURL)
-      .map(rpcEnv.setupEndpointRef(_, ENDPOINT_NAME))
-  }
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
deleted file mode 100644
index 38f1554a01..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import java.util
-import java.util.{List => JList}
-
-import org.apache.kylin.engine.spark.scheduler.{KylinJobEvent, 
KylinJobListener}
-import org.apache.spark.util.EventLoop
-
-import scala.collection.JavaConverters._
-
-class KylinJobEventLoop extends 
EventLoop[KylinJobEvent]("spark-entry-event-loop") {
-  // register listener in single thread, so LinkedList is enough
-  private var listeners: JList[KylinJobListener] = new 
util.LinkedList[KylinJobListener]()
-
-  def registerListener(listener: KylinJobListener): Boolean = {
-    listeners.add(listener)
-  }
-
-  def unregisterListener(listener: KylinJobListener): Boolean = {
-    listeners.remove(listener)
-  }
-
-  override protected def onReceive(event: KylinJobEvent): Unit = {
-    listeners.asScala.foreach(_.onReceive(event))
-  }
-
-  override protected def onError(e: Throwable): Unit = {
-
-  }
-}

Reply via email to