This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new aa462c78fe KYLIN-5217 Create a initial commit
aa462c78fe is described below
commit aa462c78fec33127427ce3246fe5500e9af7077d
Author: longfei.jiang <[email protected]>
AuthorDate: Tue Aug 9 14:05:41 2022 +0800
KYLIN-5217 Create a initial commit
---
.../kylin/cluster/ClusterManagerFactory.scala | 37 ----
.../apache/kylin/cluster/K8sClusterManager.scala | 65 -------
.../kylin/cluster/SchedulerInfoCmdHelper.scala | 167 -----------------
.../kylin/cluster/StandaloneClusterManager.scala | 22 ---
.../apache/kylin/cluster/YarnClusterManager.scala | 206 ---------------------
.../cluster/parser/CapacitySchedulerParser.scala | 92 ---------
.../kylin/cluster/parser/FairSchedulerParser.scala | 64 -------
.../kylin/cluster/parser/SchedulerParser.scala | 43 -----
.../cluster/parser/SchedulerParserFactory.scala | 43 -----
.../engine/spark/scheduler/ClusterMonitor.scala | 82 --------
.../kylin/engine/spark/scheduler/JobRuntime.scala | 59 ------
.../engine/spark/scheduler/KylinJobEvent.scala | 44 -----
.../engine/spark/scheduler/KylinJobListener.scala | 23 ---
.../org/apache/spark/application/JobMonitor.scala | 94 ----------
.../apache/spark/application/JobWorkSpace.scala | 153 ---------------
.../org/apache/spark/application/JobWorker.scala | 71 -------
.../deploy/master/StandaloneClusterManager.scala | 157 ----------------
.../apache/spark/scheduler/KylinJobEventLoop.scala | 48 -----
18 files changed, 1470 deletions(-)
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
deleted file mode 100644
index 6009232f57..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster
-
-import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
-import io.kyligence.kap.guava20.shaded.common.util.concurrent.SimpleTimeLimiter
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.util.KylinReflectUtils
-
-
-object ClusterManagerFactory {
-
- val pool: ExecutorService = Executors.newCachedThreadPool
-
- def create(kylinConfig: KylinConfig): IClusterManager = {
- val timeLimiter = SimpleTimeLimiter.create(pool)
- val target =
KylinReflectUtils.createObject(kylinConfig.getClusterManagerClassName)._1.asInstanceOf[IClusterManager]
-
- timeLimiter.newProxy(target, classOf[IClusterManager],
kylinConfig.getClusterManagerTimeoutThreshold, TimeUnit.SECONDS);
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
deleted file mode 100644
index 8711622885..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.cluster
-
-import java.util
-
-import com.google.common.collect.Lists
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-
-class K8sClusterManager extends IClusterManager with Logging {
- override def fetchMaximumResourceAllocation: ResourceInfo = {
- ResourceInfo(Int.MaxValue, 1000)
- }
-
- override def fetchQueueAvailableResource(queueName: String):
AvailableResource = {
- AvailableResource(ResourceInfo(Int.MaxValue, 1000),
ResourceInfo(Int.MaxValue, 1000))
- }
-
- override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
- logInfo("Get Build Tracking Url!")
- val applicationId = sparkSession.sparkContext.applicationId
- ""
- }
-
- override def killApplication(jobStepId: String): Unit = {
- logInfo("Kill Application $jobStepId !")
- }
-
- override def killApplication(jobStepPrefix: String, jobStepId: String): Unit
= {
- logInfo("Kill Application $jobStepPrefix $jobStepId !")
- }
-
- override def isApplicationBeenKilled(applicationId: String): Boolean = {
- true
- }
-
- override def getRunningJobs(queues: util.Set[String]): util.List[String] = {
- Lists.newArrayList()
- }
-
- override def fetchQueueStatistics(queueName: String): ResourceInfo = {
- ResourceInfo(Int.MaxValue, 1000)
- }
-
- override def applicationExisted(jobId: String): Boolean = {
- false
- }
-
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
deleted file mode 100644
index 95c35a8a75..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster
-
-import org.apache.kylin.engine.spark.utils.StorageUtils
-import io.netty.util.internal.ThrowableUtil
-import org.apache.hadoop.yarn.conf.{HAUtil, YarnConfiguration}
-import org.apache.kylin.common.util.{JsonUtil, ShellException}
-import org.apache.spark.internal.Logging
-
-import java.io.{BufferedReader, InputStreamReader}
-import java.nio.charset.StandardCharsets
-
-object SchedulerInfoCmdHelper extends Logging {
- private val useHttps: Boolean =
YarnConfiguration.useHttps(StorageUtils.getCurrentYarnConfiguration)
-
- def schedulerInfo: String = {
- val cmds = getSocketAddress.map(address => genCmd(address._1, address._2))
- getInfoByCmds(cmds)
- }
-
- def metricsInfo: String = {
- val cmds = getSocketAddress.map(address => genMetricsCmd(address._1,
address._2))
- getInfoByCmds(cmds)
- }
-
- private[cluster] def getInfoByCmds(cmds: Iterable[String]): String = {
- val results = cmds.map { cmd =>
- try {
- logInfo(s"Executing the command: ${cmd}")
- execute(cmd)
- } catch {
- case throwable: Throwable => (1,
ThrowableUtil.stackTraceToString(throwable))
- }
- }
- val tuples = results.filter(result => result._1 == 0 &&
JsonUtil.isJson(result._2))
- if (tuples.isEmpty) {
- val errors = tuples.map(_._2).mkString("\n")
- logWarning(s"Error occurred when get scheduler info from cmd $cmds")
- throw new RuntimeException(errors)
- } else {
- require(tuples.size == 1)
- tuples.head._2
- }
- }
-
- private[cluster] def getSocketAddress: Map[String, Int] = {
- val conf = StorageUtils.getCurrentYarnConfiguration
- val addresses = if (HAUtil.isHAEnabled(conf)) {
- val haIds = HAUtil.getRMHAIds(conf).toArray
- require(haIds.nonEmpty, "Ha ids is empty, please check your
yarn-site.xml.")
- if (useHttps) {
- haIds.map(id =>
conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS}.$id",
- YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT))
- } else {
- haIds.map(id =>
conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_ADDRESS}.$id",
- YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT))
- }
- } else {
- if (useHttps) {
- Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
- YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT))
- } else {
- Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS,
- YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT))
- }
- }
- addresses.map(address => address.getHostName -> address.getPort).toMap
- }
-
- private[cluster] def genCmd(hostName: String, port: Int): String = {
- val uri = if (useHttps) {
- s"https://$hostName:$port/ws/v1/cluster/scheduler"
- } else {
- s"http://$hostName:$port/ws/v1/cluster/scheduler"
- }
- s"""curl -k --negotiate -u : "$uri""""
- }
-
- private[cluster] def genMetricsCmd(hostName: String, port: Int): String = {
- val uri = if (useHttps) {
- s"https://$hostName:$port/ws/v1/cluster/metrics"
- } else {
- s"http://$hostName:$port/ws/v1/cluster/metrics"
- }
- s"""curl -k --negotiate -u : "$uri""""
- }
-
- /**
- * only return std out after execute command
- *
- * @param command
- * @return
- */
- private[cluster] def execute(command: String): (Int, String) = {
- try {
- val cmd = new Array[String](3)
- val osName = System.getProperty("os.name")
- if (osName.startsWith("Windows")) {
- cmd(0) = "cmd.exe"
- cmd(1) = "/C"
- }
- else {
- cmd(0) = "/bin/bash"
- cmd(1) = "-c"
- }
- cmd(2) = command
- val builder = new ProcessBuilder(cmd: _*)
- builder.environment().putAll(System.getenv())
- val proc = builder.start
- val resultStdout = new StringBuilder
- val inReader = new BufferedReader(new
InputStreamReader(proc.getInputStream, StandardCharsets.UTF_8.name()))
- val newLine = System.getProperty("line.separator")
- var line: String = inReader.readLine()
- while (line != null) {
- resultStdout.append(line).append(newLine)
- line = inReader.readLine()
- }
-
- val stderr = new StringBuilder
- val errorReader = new BufferedReader(new
InputStreamReader(proc.getErrorStream, StandardCharsets.UTF_8.name()))
- line = errorReader.readLine()
- while (line != null) {
- stderr.append(line).append(newLine)
- line = errorReader.readLine()
- }
-
- logInfo(s"The corresponding http response for the above command: \n
${stderr}")
- try {
- val exitCode = proc.waitFor
- if (exitCode != 0) {
- logError(s"executing command $command; exit code: $exitCode")
-
logError(s"==========================[stderr]===============================")
- logError(stderr.toString)
-
logError(s"==========================[stderr]===============================")
-
-
logError(s"==========================[stdout]===============================")
- logError(resultStdout.toString)
-
logError(s"==========================[stdout]===============================")
- }
- (exitCode, resultStdout.toString)
- } catch {
- case e: InterruptedException =>
- Thread.currentThread.interrupt()
- throw e
- }
- } catch {
- case e: Exception => throw new ShellException(e)
- }
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
deleted file mode 100644
index c3737a9090..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster
-
-class StandaloneClusterManager extends
org.apache.spark.deploy.master.StandaloneClusterManager {
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
deleted file mode 100644
index c3f6275a0a..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster
-
-import java.io.IOException
-import java.util
-import java.util.stream.Collectors
-import java.util.{ArrayList, EnumSet, List, Set}
-
-import com.google.common.collect.{Lists, Sets}
-import org.apache.kylin.cluster.parser.SchedulerParserFactory
-import org.apache.kylin.engine.spark.utils.StorageUtils
-import org.apache.commons.collections.CollectionUtils
-import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport,
QueueInfo, YarnApplicationState}
-import org.apache.hadoop.yarn.client.api.YarnClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.exceptions.YarnException
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConverters._
-
-class YarnClusterManager extends IClusterManager with Logging {
-
- import org.apache.kylin.cluster.YarnClusterManager._
-
- private lazy val maximumResourceAllocation: ResourceInfo = {
- withYarnClient(yarnClient => {
- try {
- val response = yarnClient.createApplication().getNewApplicationResponse
- val cap = response.getMaximumResourceCapability
- val resourceInfo = ResourceInfo(cap.getMemory, cap.getVirtualCores)
- logInfo(s"Cluster maximum resource allocation $resourceInfo")
- resourceInfo
- } catch {
- case throwable: Throwable =>
- logError("Error occurred when get resource upper limit per
container.", throwable)
- throw throwable
- }
- })
- }
-
- override def fetchMaximumResourceAllocation: ResourceInfo =
maximumResourceAllocation
-
- override def fetchQueueAvailableResource(queueName: String):
AvailableResource = {
- val info = SchedulerInfoCmdHelper.schedulerInfo
- val parser = SchedulerParserFactory.create(info)
- parser.parse(info)
- parser.availableResource(queueName)
- }
-
- def getYarnApplicationReport(applicationId: String): ApplicationReport = {
- withYarnClient(yarnClient => {
- val array = applicationId.split("_")
- if (array.length < 3) return null
- val appId = ApplicationId.newInstance(array(1).toLong, array(2).toInt)
- yarnClient.getApplicationReport(appId)
- })
- }
-
- override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
- val applicationId = sparkSession.sparkContext.applicationId
- val applicationReport = getYarnApplicationReport(applicationId)
- if (null == applicationReport) null
- else applicationReport.getTrackingUrl
- }
-
- def killApplication(jobStepId: String): Unit = {
- killApplication("job_step_", jobStepId)
- }
-
- def killApplication(jobStepPrefix: String, jobStepId: String): Unit = {
- withYarnClient(yarnClient => {
- var orphanApplicationId: String = null
- try {
- val types: Set[String] = Sets.newHashSet("SPARK")
- val states: EnumSet[YarnApplicationState] =
EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
- YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING)
- val applicationReports: List[ApplicationReport] =
yarnClient.getApplications(types, states)
- if (CollectionUtils.isEmpty(applicationReports)) return
- import scala.collection.JavaConverters._
- for (report <- applicationReports.asScala) {
- if (report.getName.equalsIgnoreCase(jobStepPrefix + jobStepId)) {
- orphanApplicationId = report.getApplicationId.toString
- yarnClient.killApplication(report.getApplicationId)
- logInfo(s"kill orphan yarn application $orphanApplicationId
succeed, job step $jobStepId")
- }
- }
- } catch {
- case ex@(_: YarnException | _: IOException) =>
- logError(s"kill orphan yarn application $orphanApplicationId failed,
job step $jobStepId", ex)
- }
- })
- }
-
- def getRunningJobs(queues: Set[String]): List[String] = {
- withYarnClient(yarnClient => {
- if (queues.isEmpty) {
- val applications =
yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING))
- if (null == applications) new ArrayList[String]()
- else applications.asScala.map(_.getName).asJava
- } else {
- val runningJobs: List[String] = new ArrayList[String]()
- for (queue <- queues.asScala) {
- val applications = yarnClient.getQueueInfo(queue).getApplications
- if (null != applications) {
- applications.asScala.filter(_.getYarnApplicationState ==
YarnApplicationState.RUNNING).map(_.getName).map(runningJobs.add)
- }
- }
- runningJobs
- }
- })
- }
-
- override def fetchQueueStatistics(queueName: String): ResourceInfo = {
- withYarnClient(yarnClient => {
- val qs = yarnClient.getQueueInfo(queueName).getQueueStatistics
- ResourceInfo(qs.getAvailableMemoryMB.toInt, qs.getAvailableVCores.toInt)
- })
- }
-
- override def isApplicationBeenKilled(jobStepId: String): Boolean = {
- withYarnClient(yarnClient => {
- val types: Set[String] = Sets.newHashSet("SPARK")
- val states: EnumSet[YarnApplicationState] =
EnumSet.of(YarnApplicationState.KILLED)
- val applicationReports: List[ApplicationReport] =
yarnClient.getApplications(types, states)
- if (!CollectionUtils.isEmpty(applicationReports)) {
- import scala.collection.JavaConverters._
- for (report <- applicationReports.asScala) {
- if (report.getName.equalsIgnoreCase("job_step_" + jobStepId)) {
- return true
- }
- }
- }
- false
- })
- }
-
- def applicationExisted(jobId: String): Boolean = {
- withYarnClient(yarnClient => {
- val types: util.Set[String] = Sets.newHashSet("SPARK")
- val states: util.EnumSet[YarnApplicationState] =
util.EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
- YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING)
- val applicationReports: util.List[ApplicationReport] =
yarnClient.getApplications(types, states)
- if (!CollectionUtils.isEmpty(applicationReports)) {
- import scala.collection.JavaConverters._
- for (report <- applicationReports.asScala) {
- if (report.getName.equalsIgnoreCase(jobId)) {
- return true
- }
- }
- }
- return false
- })
- }
-
- def listQueueNames(): util.List[String] = {
- withYarnClient(yarnClient => {
- val queues: util.List[QueueInfo] = yarnClient.getAllQueues
- val queueNames: util.List[String] = Lists.newArrayList()
- for (queue <- queues.asScala) {
- queueNames.add(queue.getQueueName)
- }
- queueNames
- })
- }
-
-}
-
-object YarnClusterManager {
-
- def withYarnClient[T](body: YarnClient => T): T = {
- val yarnClient = YarnClient.createYarnClient
- yarnClient.init(getSpecifiedConf)
- yarnClient.start()
- try {
- body(yarnClient)
- } finally {
- yarnClient.close()
- }
- }
-
- private def getSpecifiedConf: YarnConfiguration = {
- // yarn cluster mode couldn't access local hadoop_conf_dir
- val yarnConf = StorageUtils.getCurrentYarnConfiguration
- // https://issues.apache.org/jira/browse/SPARK-15343
- yarnConf.set("yarn.timeline-service.enabled", "false")
- yarnConf
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
deleted file mode 100644
index e22700c7e7..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster.parser
-
-import java.util.{List => JList}
-import com.fasterxml.jackson.databind.JsonNode
-import org.apache.kylin.cluster.{AvailableResource, ResourceInfo}
-import org.apache.kylin.engine.spark.application.SparkApplication
-import org.apache.kylin.engine.spark.job.KylinBuildEnv
-
-import scala.collection.JavaConverters._
-
-class CapacitySchedulerParser extends SchedulerParser {
-
- override def availableResource(queueName: String): AvailableResource = {
- val queues: JList[JsonNode] = root.findParents("queueName")
- val nodes = queues.asScala.filter(queue =>
parseValue(queue.get("queueName")).equals(queueName))
- require(nodes.size == 1)
-
- var (queueAvailable, queueMax) = queueCapacity(nodes.head)
- val totalResource = calTotalResource(nodes.head)
- val clusterNode = root.findValue("schedulerInfo")
- val cluster = clusterAvailableCapacity(clusterNode)
- var min = Math.min(queueAvailable, cluster)
- logInfo(s"queueAvailable is ${queueAvailable}, min is ${min}, queueMax is
${queueMax}")
- if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && queueMax
== 0.0) {
- logInfo("configure yarn queue using dynamic resource plan in capacity
scheduler")
- queueMax = 1.0
- }
- if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && min ==
0.0) {
- logInfo("configure yarn queue using dynamic resource plan in capacity
scheduler")
- min = 1.0
- }
- var resource = AvailableResource(totalResource.percentage(min),
totalResource.percentage(queueMax))
- try {
- val queueAvailableRes =
KylinBuildEnv.get.clusterManager.fetchQueueStatistics(queueName)
- resource = AvailableResource(queueAvailableRes,
totalResource.percentage(queueMax))
- } catch {
- case e: Error =>
- logInfo(s"The current hadoop version does not support
QueueInfo.getQueueStatistics method.")
- None
- }
-
- logInfo(s"Capacity actual available resource: $resource.")
- resource
- }
-
- private def clusterAvailableCapacity(node: JsonNode): Double = {
- val max = parseValue(node.get("capacity")).toDouble
- val used = parseValue(node.get("usedCapacity")).toDouble
- val capacity = (max - used) / 100
- logInfo(s"Cluster available capacity: $capacity.")
- capacity
- }
-
- private def queueCapacity(node: JsonNode): (Double, Double) = {
- val max = parseValue(node.get("absoluteMaxCapacity")).toDouble
- val used = parseValue(node.get("absoluteUsedCapacity")).toDouble
- val available = (max - used) / 100
- logInfo(s"Queue available capacity: $available.")
- (available, max / 100)
- }
-
- private def calTotalResource(node: JsonNode): ResourceInfo = {
- val usedMemory = parseValue(node.get("resourcesUsed").get("memory")).toInt
- if (usedMemory != 0) {
- val usedCapacity = parseValue(node.get("absoluteUsedCapacity")).toDouble
/ 100
- val resource = ResourceInfo(Math.floor(usedMemory / usedCapacity).toInt,
Int.MaxValue)
- logInfo(s"Estimate total cluster resource is $resource.")
- resource
- } else {
- logInfo("Current queue used memory is 0, seem available resource as
infinite.")
- ResourceInfo(Int.MaxValue, Int.MaxValue)
- }
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
deleted file mode 100644
index f2c9125191..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster.parser
-
-import java.util.{List => JList}
-
-import com.fasterxml.jackson.databind.JsonNode
-import org.apache.kylin.cluster.{AvailableResource, ResourceInfo}
-
-import scala.collection.JavaConverters._
-
-class FairSchedulerParser extends SchedulerParser {
-
- override def availableResource(queueName: String): AvailableResource = {
- val cluster = clusterAvailableResource()
- val queue = queueAvailableResource(queueName)
- val resource =
AvailableResource(cluster.available.reduceMin(queue.available), queue.max)
- logInfo(s"Current actual available resource: $resource.")
- resource
- }
-
- private def queueAvailableResource(queueName: String): AvailableResource = {
- val queues: JList[JsonNode] = root.findParents("queueName")
- val nodes = queues.asScala.filter(queue =>
parseValue(queue.get("queueName")).equals(queueName))
- require(nodes.size == 1)
- val resource = calAvailableResource(nodes.head)
- logInfo(s"Queue available resource: $resource.")
- resource
- }
-
- private def clusterAvailableResource(): AvailableResource = {
- val node = root.findValue("rootQueue")
- require(node != null)
- val resource = calAvailableResource(node)
- logInfo(s"Cluster available resource: $resource.")
- resource
- }
-
- private def calAvailableResource(node: JsonNode): AvailableResource = {
- val maxResources = node.get("maxResources")
- val maxMemory = parseValue(maxResources.get("memory")).toInt
- val maxCores = parseValue(maxResources.get("vCores")).toInt
- val usedResources = node.get("usedResources")
- val usedMemory = parseValue(usedResources.get("memory")).toInt
- val usedCores = parseValue(usedResources.get("vCores")).toInt
- AvailableResource(ResourceInfo(maxMemory - usedMemory, maxCores -
usedCores), ResourceInfo(maxMemory, maxCores))
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
deleted file mode 100644
index 9db043abc7..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster.parser
-
-import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
-import org.apache.kylin.cluster.AvailableResource
-import org.apache.spark.internal.Logging
-
-trait SchedulerParser extends Logging {
- protected var root: JsonNode = _
- protected lazy val mapper = new ObjectMapper
-
- def availableResource(queueName: String): AvailableResource
-
- def parse(schedulerInfo: String): Unit = {
- this.root = mapper.readTree(schedulerInfo)
- }
-
- // value in some scheduler info format to "value"
- protected def parseValue(node: JsonNode): String = {
- if (node.toString.startsWith("\"")) {
- node.toString.replace("\"", "")
- } else {
- node.toString
- }
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
deleted file mode 100644
index 315ae3155d..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.cluster.parser
-
-import org.apache.kylin.common.util.JsonUtil
-import org.apache.spark.internal.Logging
-
-object SchedulerParserFactory extends Logging{
- def create(info: String): SchedulerParser = {
- try {
- val schedulerType =
JsonUtil.readValueAsTree(info).findValue("type").toString
- // call contains rather than equals cause of value is surrounded with ",
for example: {"type":"fairScheduler"}
- // do not support FifoScheduler for now
- if (schedulerType.contains("capacityScheduler")) {
- new CapacitySchedulerParser
- } else if (schedulerType.contains("fairScheduler")) {
- new FairSchedulerParser
- } else {
- throw new IllegalArgumentException(s"Unsupported scheduler type from
scheduler info. $schedulerType")
- }
- } catch {
- case throwable: Throwable =>
- logError(s"Invalid scheduler info. $info")
- throw throwable
- }
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
deleted file mode 100644
index d9d80f4298..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.scheduler
-
-import org.apache.kylin.common.KapConfig
-import org.apache.kylin.engine.spark.job.KylinBuildEnv
-import org.apache.kylin.engine.spark.utils.ThreadUtils
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
-
-
-class ClusterMonitor extends Logging {
- private lazy val scheduler = //
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("connect-master-guard")
- private val JOB_STEP_PREFIX = "job_step_"
-
- def scheduleAtFixedRate(func: () => Unit, period: Long): Unit = {
- scheduler.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = func.apply()
- }, period, period, TimeUnit.SECONDS)
- }
-
- def monitorSparkMaster(atomicBuildEnv: AtomicReference[KylinBuildEnv],
atomicSparkSession: AtomicReference[SparkSession],
- disconnectTimes: AtomicLong,
atomicUnreachableSparkMaster: AtomicBoolean): Unit = {
- val config = atomicBuildEnv.get().kylinConfig
- if (KapConfig.wrap(config).isCloud && !config.isUTEnv) {
- val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes
- if (disconnectMaxTimes >= 0) {
- val connectPeriod = config.getClusterManagerHealCheckIntervalSecond
- logDebug(s"ClusterMonitor thread start with max times is
$disconnectMaxTimes period is $connectPeriod")
- scheduleAtFixedRate(() => {
- monitor(atomicBuildEnv, atomicSparkSession, disconnectTimes,
atomicUnreachableSparkMaster)
- }, connectPeriod)
- }
- }
- }
-
- def monitor(atomicBuildEnv: AtomicReference[KylinBuildEnv],
atomicSparkSession: AtomicReference[SparkSession],
- disconnectTimes: AtomicLong, atomicUnreachableSparkMaster:
AtomicBoolean): Unit = {
- logDebug("monitor start")
- val config = atomicBuildEnv.get().kylinConfig
- val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes
- try {
- val clusterManager = atomicBuildEnv.get.clusterManager
- clusterManager.applicationExisted(JOB_STEP_PREFIX +
atomicBuildEnv.get().buildJobInfos.getJobStepId)
- disconnectTimes.set(0)
- atomicUnreachableSparkMaster.set(false)
- logDebug("monitor stop")
- } catch {
- case e: Exception =>
- logError(s"monitor error with : ${e.getMessage}")
- if (disconnectTimes.incrementAndGet() >= disconnectMaxTimes &&
atomicSparkSession.get() != null) {
- logDebug(s"Job will stop: Unable connect spark master to reach
timeout maximum time")
- atomicUnreachableSparkMaster.set(true)
- atomicSparkSession.get().stop()
- }
- }
- }
-
- def shutdown(): Unit = {
- scheduler.shutdownNow()
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
deleted file mode 100644
index ee72d16e51..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.scheduler
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.kylin.engine.spark.utils.ThreadUtils
-
-class JobRuntime(val maxThreadCount: Int) {
-
- private lazy val minThreads = 1
- private lazy val maxThreads = Math.max(minThreads, maxThreadCount)
- // Maybe we should parameterize nThreads.
- private lazy val threadPool = //
- ThreadUtils.newDaemonScalableThreadPool("build-thread", //
- minThreads, maxThreads, 20, TimeUnit.SECONDS)
-
- // Drain layout result using single thread.
- private lazy val scheduler = //
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("build-scheduler")
-
- def submit(fun: () => Unit): Unit = {
- threadPool.submit(new Runnable {
- override def run(): Unit = fun.apply()
- })
- }
-
- def scheduleCheckpoint(fun: () => Unit): Unit = {
- scheduler.scheduleWithFixedDelay(() => fun.apply(), 30L, 30L,
TimeUnit.SECONDS)
- }
-
- def schedule(func: () => Unit, delay: Long, unit: TimeUnit): Unit = {
- scheduler.schedule(new Runnable {
- override def run(): Unit = func.apply()
- }, delay, unit)
- }
-
- def shutdown(): Unit = {
- scheduler.shutdownNow()
- threadPool.shutdownNow()
- }
-
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
deleted file mode 100644
index d0de86c938..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.scheduler
-
-sealed trait KylinJobEvent
-
-sealed trait JobStatus extends KylinJobEvent
-
-case class JobSucceeded() extends JobStatus
-
-case class JobFailed(reason: String, throwable: Throwable) extends JobStatus
-
-sealed trait JobEndReason extends KylinJobEvent
-
-sealed trait JobFailedReason extends JobEndReason
-
-case class ResourceLack(throwable: Throwable) extends JobFailedReason
-
-case class ExceedMaxRetry(throwable: Throwable) extends JobFailedReason
-
-case class UnknownThrowable(throwable: Throwable) extends JobFailedReason
-
-
-sealed trait JobCommand extends KylinJobEvent
-
-case class RunJob() extends JobCommand
-
-
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
deleted file mode 100644
index 686a183c32..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.scheduler
-
-trait KylinJobListener {
- def onReceive(event: KylinJobEvent): Unit
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
deleted file mode 100644
index 98a5d7e5da..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.application
-
-import org.apache.kylin.engine.spark.job.KylinBuildEnv
-import org.apache.kylin.engine.spark.scheduler._
-import io.netty.util.internal.ThrowableUtil
-import org.apache.kylin.common.util.Unsafe
-import org.apache.spark.autoheal.ExceptionTerminator
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.KylinJobEventLoop
-
-class JobMonitor(eventLoop: KylinJobEventLoop) extends Logging {
- var retryTimes = 0
- eventLoop.registerListener(new KylinJobListener {
- override def onReceive(event: KylinJobEvent): Unit = {
- event match {
- case rl: ResourceLack => handleResourceLack(rl)
- case ut: UnknownThrowable => handleUnknownThrowable(ut)
- case emr: ExceedMaxRetry => handleExceedMaxRetry(emr)
- case _ =>
- }
- }
- })
-
- def stop(): Unit = {
- }
-
- def handleResourceLack(rl: ResourceLack): Unit = {
- try {
- if (rl.throwable != null && rl.throwable.getCause != null) {
- if (rl.throwable.getCause.isInstanceOf[NoRetryException]) {
- eventLoop.post(JobFailed(rl.throwable.getCause.getMessage,
rl.throwable.getCause))
- return
- }
- }
-
- logInfo(s"handleResourceLack --> ${rl.throwable.getCause}")
- val buildEnv = KylinBuildEnv.get()
- // if killed, job failed without retry
- val jobStepId = buildEnv.buildJobInfos.getJobStepId
- if (buildEnv.clusterManager.isApplicationBeenKilled(jobStepId)) {
- eventLoop.post(JobFailed(s"Submitted application $jobStepId has been
killed.", rl.throwable))
- return
- }
- retryTimes += 1
- KylinBuildEnv.get().buildJobInfos.recordRetryTimes(retryTimes)
- val maxRetry = buildEnv.kylinConfig.getSparkEngineMaxRetryTime
- if (retryTimes <= maxRetry) {
- logError(s"Job failed $retryTimes time(s). Cause:", rl.throwable)
- Unsafe.setProperty("kylin.spark-conf.auto-prior", "false")
- ExceptionTerminator.resolveException(rl, eventLoop)
- } else {
- eventLoop.post(ExceedMaxRetry(rl.throwable))
- }
- } catch {
- case throwable: Throwable => eventLoop.post(JobFailed("Error occurred
when generate retry configuration.", throwable))
- }
- }
-
-
- def handleExceedMaxRetry(emr: ExceedMaxRetry): Unit = {
- eventLoop.post(JobFailed("Retry times exceed MaxRetry set in the
KylinConfig.", emr.throwable))
- }
-
- def handleUnknownThrowable(ur: UnknownThrowable): Unit = {
- eventLoop.post(JobFailed("Unknown error occurred during the job.",
ur.throwable))
- }
-}
-
-case class RetryInfo(overrideConf: java.util.Map[String, String], throwable:
Throwable) {
- override def toString: String = {
- s"""RetryInfo{
- | overrideConf : $overrideConf,
- | throwable : ${ThrowableUtil.stackTraceToString(throwable)}
- |}""".stripMargin
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
deleted file mode 100644
index 1837b92fa0..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.application
-
-import io.kyligence.kap.engine.spark.job.ParamsConstants
-import org.apache.commons.lang.StringUtils
-import org.apache.commons.lang3.exception.ExceptionUtils
-import org.apache.kylin.common.util.{JsonUtil, Unsafe}
-import org.apache.kylin.engine.spark.application.SparkApplication
-import org.apache.kylin.engine.spark.job.KylinBuildEnv
-import org.apache.kylin.engine.spark.scheduler._
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.KylinJobEventLoop
-
-import java.util
-import java.util.concurrent.CountDownLatch
-
-/**
- * Spark driver part, construct the real spark job [SparkApplication]
- */
-object JobWorkSpace extends Logging {
- def execute(args: Array[String]): Unit = {
- try {
- val (application, appArgs) = resolveArgs(args)
- val eventLoop = new KylinJobEventLoop
- val worker = new JobWorker(application, appArgs, eventLoop)
- val monitor = new JobMonitor(eventLoop)
- val workspace = new JobWorkSpace(eventLoop, monitor, worker)
- val statusCode = workspace.run()
- if (statusCode != 0) {
- Unsafe.systemExit(statusCode)
- }
- } catch {
- case throwable: Throwable =>
- logError("Error occurred when init job workspace.", throwable)
- Unsafe.systemExit(1)
- }
- }
-
- def resolveArgs(args: Array[String]): (SparkApplication, Array[String]) = {
- if (args.length < 2 || args(0) != "-className") throw new
IllegalArgumentException("className is required")
- val className = args(1)
- // scalastyle:off
- val o = Class.forName(className).newInstance
- // scalastyle:on
- if (!o.isInstanceOf[SparkApplication]) throw new
IllegalArgumentException(className + " is not a subClass of
AbstractApplication")
- val appArgs = args.slice(2, args.length)
- val application = o.asInstanceOf[SparkApplication]
- (application, appArgs)
- }
-}
-
-class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker:
JobWorker) extends Logging {
- require(eventLoop != null)
- require(monitor != null)
- require(worker != null)
-
- private var statusCode: Int = 0
- private val latch = new CountDownLatch(1)
-
- eventLoop.registerListener(new KylinJobListener {
- override def onReceive(event: KylinJobEvent): Unit = {
- event match {
- case _: JobSucceeded => success()
- case jf: JobFailed => fail(jf)
- case _ =>
- }
- }
- })
-
- def run(): Int = {
- eventLoop.start()
- eventLoop.post(RunJob())
- latch.await()
- statusCode
- }
-
- def success(): Unit = {
- try {
- stop()
- } finally {
- statusCode = 0
- latch.countDown()
- }
- }
-
- def fail(jf: JobFailed): Unit = {
- try {
- logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable)
- KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new
util.HashMap, jf.throwable))
- updateJobErrorInfo(jf)
- stop()
- } finally {
- statusCode = 1
- latch.countDown()
- }
- }
-
- def stop(): Unit = {
- monitor.stop()
- worker.stop()
- eventLoop.stop()
- }
-
- def updateJobErrorInfo(jf: JobFailed): Unit = {
- val infos = KylinBuildEnv.get().buildJobInfos
- val context = worker.getApplication
-
- val project = context.getProject
- val jobId = context.getJobId
-
- val stageId = infos.getStageId
- val jobStepId = StringUtils.replace(infos.getJobStepId,
SparkApplication.JOB_NAME_PREFIX, "")
- val failedStepId = if (StringUtils.isBlank(stageId)) jobStepId else stageId
-
- val failedSegmentId = infos.getSegmentId
- val failedStack = ExceptionUtils.getStackTrace(jf.throwable)
- val failedReason =
- if (context.getAtomicUnreachableSparkMaster.get()) "Unable connect spark
master to reach timeout maximum time"
- else jf.reason
- val url = "/kylin/api/jobs/error"
-
- val payload: util.HashMap[String, Object] = new util.HashMap[String,
Object](5)
- payload.put("project", project)
- payload.put("job_id", jobId)
- payload.put("failed_step_id", failedStepId)
- payload.put("failed_segment_id", failedSegmentId)
- payload.put("failed_stack", failedStack)
- payload.put("failed_reason", failedReason)
- val json = JsonUtil.writeValueAsString(payload)
- val params = new util.HashMap[String, String]()
- val config = KylinBuildEnv.get().kylinConfig
- params.put(ParamsConstants.TIME_OUT,
config.getUpdateJobInfoTimeout.toString)
- params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true))
- context.getReport.updateSparkJobInfo(params, url, json);
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
deleted file mode 100644
index 069ac171cc..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.application
-
-import java.util.concurrent.Executors
-
-import org.apache.kylin.engine.spark.application.SparkApplication
-import org.apache.kylin.engine.spark.scheduler._
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.KylinJobEventLoop
-
-class JobWorker(application: SparkApplication, args: Array[String], eventLoop:
KylinJobEventLoop) extends Logging {
- private val pool = Executors.newSingleThreadExecutor()
-
- def getApplication: SparkApplication = application
-
- eventLoop.registerListener(new KylinJobListener {
- override def onReceive(event: KylinJobEvent): Unit = {
- event match {
- case _: RunJob => runJob()
- case _ =>
- }
- }
- })
-
- def stop(): Unit = {
- pool.shutdownNow()
- application.logJobInfo()
- }
-
- private def runJob(): Unit = {
- execute()
- }
-
-
- private def execute(): Unit = {
- pool.execute(new Runnable {
- override def run(): Unit = {
- try {
- application.execute(args)
- eventLoop.post(JobSucceeded())
- } catch {
- case exception: NoRetryException =>
eventLoop.post(UnknownThrowable(exception))
- case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
- }
- }
- })
- }
-}
-
-class NoRetryException(msg: String) extends java.lang.Exception(msg) {
- def this() {
- this(null)
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
deleted file mode 100644
index 5cefd24054..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.master
-
-import java.util
-
-import org.apache.kylin.cluster.{AvailableResource, IClusterManager,
ResourceInfo}
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.deploy.DeployMessages.{KillApplication,
MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.master.StandaloneClusterManager.masterEndpoints
-import org.apache.spark.internal.Logging
-import org.apache.spark.rpc.{RpcAddress, RpcEnv}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.Utils
-import org.apache.spark.{SecurityManager, SparkConf}
-
-// scalastyle:off
-class StandaloneClusterManager extends IClusterManager with Logging {
-
- private val JOB_STEP_PREFIX = "job_step_"
-
- override def fetchMaximumResourceAllocation: ResourceInfo = {
- val state =
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
- val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE)
- val availableMem = aliveWorkers.map(_.memoryFree).sum
- val availableCores = aliveWorkers.map(_.coresFree).sum
- logInfo(s"Get available resource, " +
- s"availableMem: $availableMem, availableCores: $availableCores")
- ResourceInfo(availableMem, availableCores)
- }
-
- override def fetchQueueAvailableResource(queueName: String):
AvailableResource = {
- val state =
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
- val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE)
- val availableMem = aliveWorkers.map(_.memoryFree).sum
- val availableCores = aliveWorkers.map(_.coresFree).sum
- val totalMem = aliveWorkers.map(_.memory).sum
- val totalCores = aliveWorkers.map(_.cores).sum
- logInfo(s"Get available resource, " +
- s"availableMem: $availableMem, availableCores: $availableCores, " +
- s"totalMem: $totalMem, totalCores: $totalCores")
- AvailableResource(ResourceInfo(availableMem, availableCores),
ResourceInfo(totalMem, totalCores))
- }
-
- override def getBuildTrackingUrl(sparkSession: SparkSession): String = {
- val applicationId = sparkSession.sparkContext.applicationId
- logInfo(s"Get tracking url of application $applicationId")
- val state =
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
- val app = state.activeApps.find(_.id == applicationId).orNull
- if (app == null) {
- logInfo(s"No active application found of applicationId $applicationId")
- return null
- }
- app.desc.appUiUrl
- }
-
- override def killApplication(jobStepId: String): Unit = {
- killApplication(s"$JOB_STEP_PREFIX", jobStepId)
- }
-
- override def killApplication(jobStepPrefix: String, jobStepId: String): Unit
= {
- val master = masterEndpoints(0)
- val state = master.askSync[MasterStateResponse](RequestMasterState)
- val app = state.activeApps.find(_.desc.name.equals(jobStepPrefix +
jobStepId)).orNull
- if (app == null) {
- logInfo(s"No active application found of jobStepId $jobStepId")
- return
- }
- logInfo(s"Kill application ${app.id} by jobStepId $jobStepId")
- master.send(KillApplication(app.id))
- }
-
- override def getRunningJobs(queues: util.Set[String]): util.List[String] = {
- val state =
masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState)
- val jobStepNames = state.activeApps.map(_.desc.name)
- logInfo(s"Get running jobs ${jobStepNames.toSeq}")
- import scala.collection.JavaConverters._
- jobStepNames.toList.asJava
- }
-
- override def fetchQueueStatistics(queueName: String): ResourceInfo = {
- fetchMaximumResourceAllocation
- }
-
- override def isApplicationBeenKilled(jobStepId: String): Boolean = {
- val master = masterEndpoints(0)
- val state = master.askSync[MasterStateResponse](RequestMasterState)
- val app =
state.completedApps.find(_.desc.name.equals(s"$JOB_STEP_PREFIX$jobStepId")).orNull
- if (app == null) {
- false
- } else {
- "KILLED".equals(app.state.toString)
- }
- }
-
- override def applicationExisted(jobId: String): Boolean = {
- val master = masterEndpoints(0)
- val state = master.askSync[MasterStateResponse](RequestMasterState)
- val app = state.activeApps.find(_.desc.name.equalsIgnoreCase(jobId)).orNull
- if (app == null) {
- false
- } else {
- true
- }
- }
-
-}
-
-object StandaloneClusterManager extends Logging {
-
- private val ENDPOINT_NAME = "Master"
- private val CLIENT_NAME = "kylinStandaloneClient"
- private val SPARK_LOCAL = "local"
- private val SPARK_MASTER = "spark.master"
- private val SPARK_RPC_TIMEOUT = "spark.rpc.askTimeout"
- private val SPARK_AUTHENTICATE = "spark.authenticate"
- private val SPARK_AUTHENTICATE_SECRET = "spark.authenticate.secret"
- private val SPARK_NETWORK_CRYPTO_ENABLED = "spark.network.crypto.enabled"
-
- private lazy val masterEndpoints = {
- val overrideConfig = KylinConfig.getInstanceFromEnv.getSparkConfigOverride
- val conf = new SparkConf()
- if (!conf.contains(SPARK_MASTER) ||
conf.get(SPARK_MASTER).startsWith(SPARK_LOCAL)) {
- conf.set(SPARK_MASTER, overrideConfig.get(SPARK_MASTER))
- }
- if (!conf.contains(SPARK_RPC_TIMEOUT)) {
- conf.set(SPARK_RPC_TIMEOUT, "10s")
- }
- if (overrideConfig.containsKey(SPARK_AUTHENTICATE) &&
"true".equals(overrideConfig.get(SPARK_AUTHENTICATE))) {
- conf.set(SPARK_AUTHENTICATE, "true")
- conf.set(SPARK_AUTHENTICATE_SECRET,
overrideConfig.getOrDefault(SPARK_AUTHENTICATE_SECRET, "kylin"))
- conf.set(SPARK_NETWORK_CRYPTO_ENABLED,
overrideConfig.getOrDefault(SPARK_NETWORK_CRYPTO_ENABLED, "true"))
- }
- logInfo(s"Spark master ${conf.get(SPARK_MASTER)}")
- val rpcEnv = RpcEnv.create(CLIENT_NAME, Utils.localHostName(), 0, conf,
new SecurityManager(conf))
- val masterUrls = conf.get(SPARK_MASTER)
- Utils.parseStandaloneMasterUrls(masterUrls)
- .map(RpcAddress.fromSparkURL)
- .map(rpcEnv.setupEndpointRef(_, ENDPOINT_NAME))
- }
-}
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
deleted file mode 100644
index 38f1554a01..0000000000
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import java.util
-import java.util.{List => JList}
-
-import org.apache.kylin.engine.spark.scheduler.{KylinJobEvent,
KylinJobListener}
-import org.apache.spark.util.EventLoop
-
-import scala.collection.JavaConverters._
-
-class KylinJobEventLoop extends
EventLoop[KylinJobEvent]("spark-entry-event-loop") {
- // register listener in single thread, so LinkedList is enough
- private var listeners: JList[KylinJobListener] = new
util.LinkedList[KylinJobListener]()
-
- def registerListener(listener: KylinJobListener): Boolean = {
- listeners.add(listener)
- }
-
- def unregisterListener(listener: KylinJobListener): Boolean = {
- listeners.remove(listener)
- }
-
- override protected def onReceive(event: KylinJobEvent): Unit = {
- listeners.asScala.foreach(_.onReceive(event))
- }
-
- override protected def onError(e: Throwable): Unit = {
-
- }
-}