This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new aa462c78fe KYLIN-5217 Create a initial commit aa462c78fe is described below commit aa462c78fec33127427ce3246fe5500e9af7077d Author: longfei.jiang <longfei.ji...@kyligence.io> AuthorDate: Tue Aug 9 14:05:41 2022 +0800 KYLIN-5217 Create a initial commit --- .../kylin/cluster/ClusterManagerFactory.scala | 37 ---- .../apache/kylin/cluster/K8sClusterManager.scala | 65 ------- .../kylin/cluster/SchedulerInfoCmdHelper.scala | 167 ----------------- .../kylin/cluster/StandaloneClusterManager.scala | 22 --- .../apache/kylin/cluster/YarnClusterManager.scala | 206 --------------------- .../cluster/parser/CapacitySchedulerParser.scala | 92 --------- .../kylin/cluster/parser/FairSchedulerParser.scala | 64 ------- .../kylin/cluster/parser/SchedulerParser.scala | 43 ----- .../cluster/parser/SchedulerParserFactory.scala | 43 ----- .../engine/spark/scheduler/ClusterMonitor.scala | 82 -------- .../kylin/engine/spark/scheduler/JobRuntime.scala | 59 ------ .../engine/spark/scheduler/KylinJobEvent.scala | 44 ----- .../engine/spark/scheduler/KylinJobListener.scala | 23 --- .../org/apache/spark/application/JobMonitor.scala | 94 ---------- .../apache/spark/application/JobWorkSpace.scala | 153 --------------- .../org/apache/spark/application/JobWorker.scala | 71 ------- .../deploy/master/StandaloneClusterManager.scala | 157 ---------------- .../apache/spark/scheduler/KylinJobEventLoop.scala | 48 ----- 18 files changed, 1470 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala deleted file mode 100644 index 6009232f57..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cluster - -import java.util.concurrent.{ExecutorService, Executors, TimeUnit} -import io.kyligence.kap.guava20.shaded.common.util.concurrent.SimpleTimeLimiter -import org.apache.kylin.common.KylinConfig -import org.apache.spark.util.KylinReflectUtils - - -object ClusterManagerFactory { - - val pool: ExecutorService = Executors.newCachedThreadPool - - def create(kylinConfig: KylinConfig): IClusterManager = { - val timeLimiter = SimpleTimeLimiter.create(pool) - val target = KylinReflectUtils.createObject(kylinConfig.getClusterManagerClassName)._1.asInstanceOf[IClusterManager] - - timeLimiter.newProxy(target, classOf[IClusterManager], kylinConfig.getClusterManagerTimeoutThreshold, TimeUnit.SECONDS); - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala deleted file mode 100644 index 8711622885..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.cluster - -import java.util - -import com.google.common.collect.Lists -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession - -class K8sClusterManager extends IClusterManager with Logging { - override def fetchMaximumResourceAllocation: ResourceInfo = { - ResourceInfo(Int.MaxValue, 1000) - } - - override def fetchQueueAvailableResource(queueName: String): AvailableResource = { - AvailableResource(ResourceInfo(Int.MaxValue, 1000), ResourceInfo(Int.MaxValue, 1000)) - } - - override def getBuildTrackingUrl(sparkSession: SparkSession): String = { - logInfo("Get Build Tracking Url!") - val applicationId = sparkSession.sparkContext.applicationId - "" - } - - override def killApplication(jobStepId: String): Unit = { - logInfo("Kill Application $jobStepId !") - } - - override def killApplication(jobStepPrefix: String, jobStepId: String): Unit = { - logInfo("Kill Application $jobStepPrefix $jobStepId !") - } - - override def isApplicationBeenKilled(applicationId: String): Boolean = { - true - } - - override def getRunningJobs(queues: util.Set[String]): util.List[String] = { - Lists.newArrayList() - } - - override def fetchQueueStatistics(queueName: String): ResourceInfo = { - ResourceInfo(Int.MaxValue, 1000) - } - - override def applicationExisted(jobId: String): Boolean = { - false - } - -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala deleted file mode 100644 index 95c35a8a75..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cluster - -import org.apache.kylin.engine.spark.utils.StorageUtils -import io.netty.util.internal.ThrowableUtil -import org.apache.hadoop.yarn.conf.{HAUtil, YarnConfiguration} -import org.apache.kylin.common.util.{JsonUtil, ShellException} -import org.apache.spark.internal.Logging - -import java.io.{BufferedReader, InputStreamReader} -import java.nio.charset.StandardCharsets - -object SchedulerInfoCmdHelper extends Logging { - private val useHttps: Boolean = YarnConfiguration.useHttps(StorageUtils.getCurrentYarnConfiguration) - - def schedulerInfo: String = { - val cmds = getSocketAddress.map(address => genCmd(address._1, address._2)) - getInfoByCmds(cmds) - } - - def metricsInfo: String = { - val cmds = getSocketAddress.map(address => genMetricsCmd(address._1, address._2)) - getInfoByCmds(cmds) - } - - private[cluster] def getInfoByCmds(cmds: Iterable[String]): String = { - val results = cmds.map { cmd => - try { - logInfo(s"Executing the command: ${cmd}") - execute(cmd) - } catch { - case throwable: Throwable => (1, ThrowableUtil.stackTraceToString(throwable)) - } - } - val tuples = results.filter(result => result._1 == 0 && JsonUtil.isJson(result._2)) - if (tuples.isEmpty) { - val errors = tuples.map(_._2).mkString("\n") - logWarning(s"Error occurred when get scheduler info from cmd $cmds") - throw new RuntimeException(errors) - } else { - require(tuples.size == 1) - tuples.head._2 - } - } - - private[cluster] def getSocketAddress: Map[String, Int] = { - val conf = StorageUtils.getCurrentYarnConfiguration - val addresses = if (HAUtil.isHAEnabled(conf)) { - val haIds = HAUtil.getRMHAIds(conf).toArray - require(haIds.nonEmpty, "Ha ids is empty, please check your yarn-site.xml.") - if (useHttps) { - haIds.map(id => conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS}.$id", - YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT)) - } else { - haIds.map(id => conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_ADDRESS}.$id", - YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_PORT)) - } - } else { - if (useHttps) { - Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT)) - } else { - Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_PORT)) - } - } - addresses.map(address => address.getHostName -> address.getPort).toMap - } - - private[cluster] def genCmd(hostName: String, port: Int): String = { - val uri = if (useHttps) { - s"https://$hostName:$port/ws/v1/cluster/scheduler" - } else { - s"http://$hostName:$port/ws/v1/cluster/scheduler" - } - s"""curl -k --negotiate -u : "$uri"""" - } - - private[cluster] def genMetricsCmd(hostName: String, port: Int): String = { - val uri = if (useHttps) { - s"https://$hostName:$port/ws/v1/cluster/metrics" - } else { - s"http://$hostName:$port/ws/v1/cluster/metrics" - } - s"""curl -k --negotiate -u : "$uri"""" - } - - /** - * only return std out after execute command - * - * @param command - * @return - */ - private[cluster] def execute(command: String): (Int, String) = { - try { - val cmd = new Array[String](3) - val osName = System.getProperty("os.name") - if (osName.startsWith("Windows")) { - cmd(0) = "cmd.exe" - cmd(1) = "/C" - } - else { - cmd(0) = "/bin/bash" - cmd(1) = "-c" - } - cmd(2) = command - val builder = new ProcessBuilder(cmd: _*) - builder.environment().putAll(System.getenv()) - val proc = builder.start - val resultStdout = new StringBuilder - val inReader = new BufferedReader(new InputStreamReader(proc.getInputStream, StandardCharsets.UTF_8.name())) - val newLine = System.getProperty("line.separator") - var line: String = inReader.readLine() - while (line != null) { - resultStdout.append(line).append(newLine) - line = inReader.readLine() - } - - val stderr = new StringBuilder - val errorReader = new BufferedReader(new InputStreamReader(proc.getErrorStream, StandardCharsets.UTF_8.name())) - line = errorReader.readLine() - while (line != null) { - stderr.append(line).append(newLine) - line = errorReader.readLine() - } - - logInfo(s"The corresponding http response for the above command: \n ${stderr}") - try { - val exitCode = proc.waitFor - if (exitCode != 0) { - logError(s"executing command $command; exit code: $exitCode") - logError(s"==========================[stderr]===============================") - logError(stderr.toString) - logError(s"==========================[stderr]===============================") - - logError(s"==========================[stdout]===============================") - logError(resultStdout.toString) - logError(s"==========================[stdout]===============================") - } - (exitCode, resultStdout.toString) - } catch { - case e: InterruptedException => - Thread.currentThread.interrupt() - throw e - } - } catch { - case e: Exception => throw new ShellException(e) - } - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala deleted file mode 100644 index c3737a9090..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cluster - -class StandaloneClusterManager extends org.apache.spark.deploy.master.StandaloneClusterManager { -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala deleted file mode 100644 index c3f6275a0a..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cluster - -import java.io.IOException -import java.util -import java.util.stream.Collectors -import java.util.{ArrayList, EnumSet, List, Set} - -import com.google.common.collect.{Lists, Sets} -import org.apache.kylin.cluster.parser.SchedulerParserFactory -import org.apache.kylin.engine.spark.utils.StorageUtils -import org.apache.commons.collections.CollectionUtils -import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, QueueInfo, YarnApplicationState} -import org.apache.hadoop.yarn.client.api.YarnClient -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.exceptions.YarnException -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession - -import scala.collection.JavaConverters._ - -class YarnClusterManager extends IClusterManager with Logging { - - import org.apache.kylin.cluster.YarnClusterManager._ - - private lazy val maximumResourceAllocation: ResourceInfo = { - withYarnClient(yarnClient => { - try { - val response = yarnClient.createApplication().getNewApplicationResponse - val cap = response.getMaximumResourceCapability - val resourceInfo = ResourceInfo(cap.getMemory, cap.getVirtualCores) - logInfo(s"Cluster maximum resource allocation $resourceInfo") - resourceInfo - } catch { - case throwable: Throwable => - logError("Error occurred when get resource upper limit per container.", throwable) - throw throwable - } - }) - } - - override def fetchMaximumResourceAllocation: ResourceInfo = maximumResourceAllocation - - override def fetchQueueAvailableResource(queueName: String): AvailableResource = { - val info = SchedulerInfoCmdHelper.schedulerInfo - val parser = SchedulerParserFactory.create(info) - parser.parse(info) - parser.availableResource(queueName) - } - - def getYarnApplicationReport(applicationId: String): ApplicationReport = { - withYarnClient(yarnClient => { - val array = applicationId.split("_") - if (array.length < 3) return null - val appId = ApplicationId.newInstance(array(1).toLong, array(2).toInt) - yarnClient.getApplicationReport(appId) - }) - } - - override def getBuildTrackingUrl(sparkSession: SparkSession): String = { - val applicationId = sparkSession.sparkContext.applicationId - val applicationReport = getYarnApplicationReport(applicationId) - if (null == applicationReport) null - else applicationReport.getTrackingUrl - } - - def killApplication(jobStepId: String): Unit = { - killApplication("job_step_", jobStepId) - } - - def killApplication(jobStepPrefix: String, jobStepId: String): Unit = { - withYarnClient(yarnClient => { - var orphanApplicationId: String = null - try { - val types: Set[String] = Sets.newHashSet("SPARK") - val states: EnumSet[YarnApplicationState] = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, - YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) - val applicationReports: List[ApplicationReport] = yarnClient.getApplications(types, states) - if (CollectionUtils.isEmpty(applicationReports)) return - import scala.collection.JavaConverters._ - for (report <- applicationReports.asScala) { - if (report.getName.equalsIgnoreCase(jobStepPrefix + jobStepId)) { - orphanApplicationId = report.getApplicationId.toString - yarnClient.killApplication(report.getApplicationId) - logInfo(s"kill orphan yarn application $orphanApplicationId succeed, job step $jobStepId") - } - } - } catch { - case ex@(_: YarnException | _: IOException) => - logError(s"kill orphan yarn application $orphanApplicationId failed, job step $jobStepId", ex) - } - }) - } - - def getRunningJobs(queues: Set[String]): List[String] = { - withYarnClient(yarnClient => { - if (queues.isEmpty) { - val applications = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)) - if (null == applications) new ArrayList[String]() - else applications.asScala.map(_.getName).asJava - } else { - val runningJobs: List[String] = new ArrayList[String]() - for (queue <- queues.asScala) { - val applications = yarnClient.getQueueInfo(queue).getApplications - if (null != applications) { - applications.asScala.filter(_.getYarnApplicationState == YarnApplicationState.RUNNING).map(_.getName).map(runningJobs.add) - } - } - runningJobs - } - }) - } - - override def fetchQueueStatistics(queueName: String): ResourceInfo = { - withYarnClient(yarnClient => { - val qs = yarnClient.getQueueInfo(queueName).getQueueStatistics - ResourceInfo(qs.getAvailableMemoryMB.toInt, qs.getAvailableVCores.toInt) - }) - } - - override def isApplicationBeenKilled(jobStepId: String): Boolean = { - withYarnClient(yarnClient => { - val types: Set[String] = Sets.newHashSet("SPARK") - val states: EnumSet[YarnApplicationState] = EnumSet.of(YarnApplicationState.KILLED) - val applicationReports: List[ApplicationReport] = yarnClient.getApplications(types, states) - if (!CollectionUtils.isEmpty(applicationReports)) { - import scala.collection.JavaConverters._ - for (report <- applicationReports.asScala) { - if (report.getName.equalsIgnoreCase("job_step_" + jobStepId)) { - return true - } - } - } - false - }) - } - - def applicationExisted(jobId: String): Boolean = { - withYarnClient(yarnClient => { - val types: util.Set[String] = Sets.newHashSet("SPARK") - val states: util.EnumSet[YarnApplicationState] = util.EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, - YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) - val applicationReports: util.List[ApplicationReport] = yarnClient.getApplications(types, states) - if (!CollectionUtils.isEmpty(applicationReports)) { - import scala.collection.JavaConverters._ - for (report <- applicationReports.asScala) { - if (report.getName.equalsIgnoreCase(jobId)) { - return true - } - } - } - return false - }) - } - - def listQueueNames(): util.List[String] = { - withYarnClient(yarnClient => { - val queues: util.List[QueueInfo] = yarnClient.getAllQueues - val queueNames: util.List[String] = Lists.newArrayList() - for (queue <- queues.asScala) { - queueNames.add(queue.getQueueName) - } - queueNames - }) - } - -} - -object YarnClusterManager { - - def withYarnClient[T](body: YarnClient => T): T = { - val yarnClient = YarnClient.createYarnClient - yarnClient.init(getSpecifiedConf) - yarnClient.start() - try { - body(yarnClient) - } finally { - yarnClient.close() - } - } - - private def getSpecifiedConf: YarnConfiguration = { - // yarn cluster mode couldn't access local hadoop_conf_dir - val yarnConf = StorageUtils.getCurrentYarnConfiguration - // https://issues.apache.org/jira/browse/SPARK-15343 - yarnConf.set("yarn.timeline-service.enabled", "false") - yarnConf - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala deleted file mode 100644 index e22700c7e7..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cluster.parser - -import java.util.{List => JList} -import com.fasterxml.jackson.databind.JsonNode -import org.apache.kylin.cluster.{AvailableResource, ResourceInfo} -import org.apache.kylin.engine.spark.application.SparkApplication -import org.apache.kylin.engine.spark.job.KylinBuildEnv - -import scala.collection.JavaConverters._ - -class CapacitySchedulerParser extends SchedulerParser { - - override def availableResource(queueName: String): AvailableResource = { - val queues: JList[JsonNode] = root.findParents("queueName") - val nodes = queues.asScala.filter(queue => parseValue(queue.get("queueName")).equals(queueName)) - require(nodes.size == 1) - - var (queueAvailable, queueMax) = queueCapacity(nodes.head) - val totalResource = calTotalResource(nodes.head) - val clusterNode = root.findValue("schedulerInfo") - val cluster = clusterAvailableCapacity(clusterNode) - var min = Math.min(queueAvailable, cluster) - logInfo(s"queueAvailable is ${queueAvailable}, min is ${min}, queueMax is ${queueMax}") - if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && queueMax == 0.0) { - logInfo("configure yarn queue using dynamic resource plan in capacity scheduler") - queueMax = 1.0 - } - if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && min == 0.0) { - logInfo("configure yarn queue using dynamic resource plan in capacity scheduler") - min = 1.0 - } - var resource = AvailableResource(totalResource.percentage(min), totalResource.percentage(queueMax)) - try { - val queueAvailableRes = KylinBuildEnv.get.clusterManager.fetchQueueStatistics(queueName) - resource = AvailableResource(queueAvailableRes, totalResource.percentage(queueMax)) - } catch { - case e: Error => - logInfo(s"The current hadoop version does not support QueueInfo.getQueueStatistics method.") - None - } - - logInfo(s"Capacity actual available resource: $resource.") - resource - } - - private def clusterAvailableCapacity(node: JsonNode): Double = { - val max = parseValue(node.get("capacity")).toDouble - val used = parseValue(node.get("usedCapacity")).toDouble - val capacity = (max - used) / 100 - logInfo(s"Cluster available capacity: $capacity.") - capacity - } - - private def queueCapacity(node: JsonNode): (Double, Double) = { - val max = parseValue(node.get("absoluteMaxCapacity")).toDouble - val used = parseValue(node.get("absoluteUsedCapacity")).toDouble - val available = (max - used) / 100 - logInfo(s"Queue available capacity: $available.") - (available, max / 100) - } - - private def calTotalResource(node: JsonNode): ResourceInfo = { - val usedMemory = parseValue(node.get("resourcesUsed").get("memory")).toInt - if (usedMemory != 0) { - val usedCapacity = parseValue(node.get("absoluteUsedCapacity")).toDouble / 100 - val resource = ResourceInfo(Math.floor(usedMemory / usedCapacity).toInt, Int.MaxValue) - logInfo(s"Estimate total cluster resource is $resource.") - resource - } else { - logInfo("Current queue used memory is 0, seem available resource as infinite.") - ResourceInfo(Int.MaxValue, Int.MaxValue) - } - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala deleted file mode 100644 index f2c9125191..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cluster.parser - -import java.util.{List => JList} - -import com.fasterxml.jackson.databind.JsonNode -import org.apache.kylin.cluster.{AvailableResource, ResourceInfo} - -import scala.collection.JavaConverters._ - -class FairSchedulerParser extends SchedulerParser { - - override def availableResource(queueName: String): AvailableResource = { - val cluster = clusterAvailableResource() - val queue = queueAvailableResource(queueName) - val resource = AvailableResource(cluster.available.reduceMin(queue.available), queue.max) - logInfo(s"Current actual available resource: $resource.") - resource - } - - private def queueAvailableResource(queueName: String): AvailableResource = { - val queues: JList[JsonNode] = root.findParents("queueName") - val nodes = queues.asScala.filter(queue => parseValue(queue.get("queueName")).equals(queueName)) - require(nodes.size == 1) - val resource = calAvailableResource(nodes.head) - logInfo(s"Queue available resource: $resource.") - resource - } - - private def clusterAvailableResource(): AvailableResource = { - val node = root.findValue("rootQueue") - require(node != null) - val resource = calAvailableResource(node) - logInfo(s"Cluster available resource: $resource.") - resource - } - - private def calAvailableResource(node: JsonNode): AvailableResource = { - val maxResources = node.get("maxResources") - val maxMemory = parseValue(maxResources.get("memory")).toInt - val maxCores = parseValue(maxResources.get("vCores")).toInt - val usedResources = node.get("usedResources") - val usedMemory = parseValue(usedResources.get("memory")).toInt - val usedCores = parseValue(usedResources.get("vCores")).toInt - AvailableResource(ResourceInfo(maxMemory - usedMemory, maxCores - usedCores), ResourceInfo(maxMemory, maxCores)) - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala deleted file mode 100644 index 9db043abc7..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cluster.parser - -import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} -import org.apache.kylin.cluster.AvailableResource -import org.apache.spark.internal.Logging - -trait SchedulerParser extends Logging { - protected var root: JsonNode = _ - protected lazy val mapper = new ObjectMapper - - def availableResource(queueName: String): AvailableResource - - def parse(schedulerInfo: String): Unit = { - this.root = mapper.readTree(schedulerInfo) - } - - // value in some scheduler info format to "value" - protected def parseValue(node: JsonNode): String = { - if (node.toString.startsWith("\"")) { - node.toString.replace("\"", "") - } else { - node.toString - } - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala deleted file mode 100644 index 315ae3155d..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cluster.parser - -import org.apache.kylin.common.util.JsonUtil -import org.apache.spark.internal.Logging - -object SchedulerParserFactory extends Logging{ - def create(info: String): SchedulerParser = { - try { - val schedulerType = JsonUtil.readValueAsTree(info).findValue("type").toString - // call contains rather than equals cause of value is surrounded with ", for example: {"type":"fairScheduler"} - // do not support FifoScheduler for now - if (schedulerType.contains("capacityScheduler")) { - new CapacitySchedulerParser - } else if (schedulerType.contains("fairScheduler")) { - new FairSchedulerParser - } else { - throw new IllegalArgumentException(s"Unsupported scheduler type from scheduler info. $schedulerType") - } - } catch { - case throwable: Throwable => - logError(s"Invalid scheduler info. $info") - throw throwable - } - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala deleted file mode 100644 index d9d80f4298..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.scheduler - -import org.apache.kylin.common.KapConfig -import org.apache.kylin.engine.spark.job.KylinBuildEnv -import org.apache.kylin.engine.spark.utils.ThreadUtils -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession - -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} - - -class ClusterMonitor extends Logging { - private lazy val scheduler = // - ThreadUtils.newDaemonSingleThreadScheduledExecutor("connect-master-guard") - private val JOB_STEP_PREFIX = "job_step_" - - def scheduleAtFixedRate(func: () => Unit, period: Long): Unit = { - scheduler.scheduleAtFixedRate(new Runnable { - override def run(): Unit = func.apply() - }, period, period, TimeUnit.SECONDS) - } - - def monitorSparkMaster(atomicBuildEnv: AtomicReference[KylinBuildEnv], atomicSparkSession: AtomicReference[SparkSession], - disconnectTimes: AtomicLong, atomicUnreachableSparkMaster: AtomicBoolean): Unit = { - val config = atomicBuildEnv.get().kylinConfig - if (KapConfig.wrap(config).isCloud && !config.isUTEnv) { - val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes - if (disconnectMaxTimes >= 0) { - val connectPeriod = config.getClusterManagerHealCheckIntervalSecond - logDebug(s"ClusterMonitor thread start with max times is $disconnectMaxTimes period is $connectPeriod") - scheduleAtFixedRate(() => { - monitor(atomicBuildEnv, atomicSparkSession, disconnectTimes, atomicUnreachableSparkMaster) - }, connectPeriod) - } - } - } - - def monitor(atomicBuildEnv: AtomicReference[KylinBuildEnv], atomicSparkSession: AtomicReference[SparkSession], - disconnectTimes: AtomicLong, atomicUnreachableSparkMaster: AtomicBoolean): Unit = { - logDebug("monitor start") - val config = atomicBuildEnv.get().kylinConfig - val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes - try { - val clusterManager = atomicBuildEnv.get.clusterManager - clusterManager.applicationExisted(JOB_STEP_PREFIX + atomicBuildEnv.get().buildJobInfos.getJobStepId) - disconnectTimes.set(0) - atomicUnreachableSparkMaster.set(false) - logDebug("monitor stop") - } catch { - case e: Exception => - logError(s"monitor error with : ${e.getMessage}") - if (disconnectTimes.incrementAndGet() >= disconnectMaxTimes && atomicSparkSession.get() != null) { - logDebug(s"Job will stop: Unable connect spark master to reach timeout maximum time") - atomicUnreachableSparkMaster.set(true) - atomicSparkSession.get().stop() - } - } - } - - def shutdown(): Unit = { - scheduler.shutdownNow() - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala deleted file mode 100644 index ee72d16e51..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.scheduler - -import java.util.concurrent.TimeUnit - -import org.apache.kylin.engine.spark.utils.ThreadUtils - -class JobRuntime(val maxThreadCount: Int) { - - private lazy val minThreads = 1 - private lazy val maxThreads = Math.max(minThreads, maxThreadCount) - // Maybe we should parameterize nThreads. - private lazy val threadPool = // - ThreadUtils.newDaemonScalableThreadPool("build-thread", // - minThreads, maxThreads, 20, TimeUnit.SECONDS) - - // Drain layout result using single thread. - private lazy val scheduler = // - ThreadUtils.newDaemonSingleThreadScheduledExecutor("build-scheduler") - - def submit(fun: () => Unit): Unit = { - threadPool.submit(new Runnable { - override def run(): Unit = fun.apply() - }) - } - - def scheduleCheckpoint(fun: () => Unit): Unit = { - scheduler.scheduleWithFixedDelay(() => fun.apply(), 30L, 30L, TimeUnit.SECONDS) - } - - def schedule(func: () => Unit, delay: Long, unit: TimeUnit): Unit = { - scheduler.schedule(new Runnable { - override def run(): Unit = func.apply() - }, delay, unit) - } - - def shutdown(): Unit = { - scheduler.shutdownNow() - threadPool.shutdownNow() - } - -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala deleted file mode 100644 index d0de86c938..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.scheduler - -sealed trait KylinJobEvent - -sealed trait JobStatus extends KylinJobEvent - -case class JobSucceeded() extends JobStatus - -case class JobFailed(reason: String, throwable: Throwable) extends JobStatus - -sealed trait JobEndReason extends KylinJobEvent - -sealed trait JobFailedReason extends JobEndReason - -case class ResourceLack(throwable: Throwable) extends JobFailedReason - -case class ExceedMaxRetry(throwable: Throwable) extends JobFailedReason - -case class UnknownThrowable(throwable: Throwable) extends JobFailedReason - - -sealed trait JobCommand extends KylinJobEvent - -case class RunJob() extends JobCommand - - diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala deleted file mode 100644 index 686a183c32..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.scheduler - -trait KylinJobListener { - def onReceive(event: KylinJobEvent): Unit -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala deleted file mode 100644 index 98a5d7e5da..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.application - -import org.apache.kylin.engine.spark.job.KylinBuildEnv -import org.apache.kylin.engine.spark.scheduler._ -import io.netty.util.internal.ThrowableUtil -import org.apache.kylin.common.util.Unsafe -import org.apache.spark.autoheal.ExceptionTerminator -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.KylinJobEventLoop - -class JobMonitor(eventLoop: KylinJobEventLoop) extends Logging { - var retryTimes = 0 - eventLoop.registerListener(new KylinJobListener { - override def onReceive(event: KylinJobEvent): Unit = { - event match { - case rl: ResourceLack => handleResourceLack(rl) - case ut: UnknownThrowable => handleUnknownThrowable(ut) - case emr: ExceedMaxRetry => handleExceedMaxRetry(emr) - case _ => - } - } - }) - - def stop(): Unit = { - } - - def handleResourceLack(rl: ResourceLack): Unit = { - try { - if (rl.throwable != null && rl.throwable.getCause != null) { - if (rl.throwable.getCause.isInstanceOf[NoRetryException]) { - eventLoop.post(JobFailed(rl.throwable.getCause.getMessage, rl.throwable.getCause)) - return - } - } - - logInfo(s"handleResourceLack --> ${rl.throwable.getCause}") - val buildEnv = KylinBuildEnv.get() - // if killed, job failed without retry - val jobStepId = buildEnv.buildJobInfos.getJobStepId - if (buildEnv.clusterManager.isApplicationBeenKilled(jobStepId)) { - eventLoop.post(JobFailed(s"Submitted application $jobStepId has been killed.", rl.throwable)) - return - } - retryTimes += 1 - KylinBuildEnv.get().buildJobInfos.recordRetryTimes(retryTimes) - val maxRetry = buildEnv.kylinConfig.getSparkEngineMaxRetryTime - if (retryTimes <= maxRetry) { - logError(s"Job failed $retryTimes time(s). Cause:", rl.throwable) - Unsafe.setProperty("kylin.spark-conf.auto-prior", "false") - ExceptionTerminator.resolveException(rl, eventLoop) - } else { - eventLoop.post(ExceedMaxRetry(rl.throwable)) - } - } catch { - case throwable: Throwable => eventLoop.post(JobFailed("Error occurred when generate retry configuration.", throwable)) - } - } - - - def handleExceedMaxRetry(emr: ExceedMaxRetry): Unit = { - eventLoop.post(JobFailed("Retry times exceed MaxRetry set in the KylinConfig.", emr.throwable)) - } - - def handleUnknownThrowable(ur: UnknownThrowable): Unit = { - eventLoop.post(JobFailed("Unknown error occurred during the job.", ur.throwable)) - } -} - -case class RetryInfo(overrideConf: java.util.Map[String, String], throwable: Throwable) { - override def toString: String = { - s"""RetryInfo{ - | overrideConf : $overrideConf, - | throwable : ${ThrowableUtil.stackTraceToString(throwable)} - |}""".stripMargin - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala deleted file mode 100644 index 1837b92fa0..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.application - -import io.kyligence.kap.engine.spark.job.ParamsConstants -import org.apache.commons.lang.StringUtils -import org.apache.commons.lang3.exception.ExceptionUtils -import org.apache.kylin.common.util.{JsonUtil, Unsafe} -import org.apache.kylin.engine.spark.application.SparkApplication -import org.apache.kylin.engine.spark.job.KylinBuildEnv -import org.apache.kylin.engine.spark.scheduler._ -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.KylinJobEventLoop - -import java.util -import java.util.concurrent.CountDownLatch - -/** - * Spark driver part, construct the real spark job [SparkApplication] - */ -object JobWorkSpace extends Logging { - def execute(args: Array[String]): Unit = { - try { - val (application, appArgs) = resolveArgs(args) - val eventLoop = new KylinJobEventLoop - val worker = new JobWorker(application, appArgs, eventLoop) - val monitor = new JobMonitor(eventLoop) - val workspace = new JobWorkSpace(eventLoop, monitor, worker) - val statusCode = workspace.run() - if (statusCode != 0) { - Unsafe.systemExit(statusCode) - } - } catch { - case throwable: Throwable => - logError("Error occurred when init job workspace.", throwable) - Unsafe.systemExit(1) - } - } - - def resolveArgs(args: Array[String]): (SparkApplication, Array[String]) = { - if (args.length < 2 || args(0) != "-className") throw new IllegalArgumentException("className is required") - val className = args(1) - // scalastyle:off - val o = Class.forName(className).newInstance - // scalastyle:on - if (!o.isInstanceOf[SparkApplication]) throw new IllegalArgumentException(className + " is not a subClass of AbstractApplication") - val appArgs = args.slice(2, args.length) - val application = o.asInstanceOf[SparkApplication] - (application, appArgs) - } -} - -class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker: JobWorker) extends Logging { - require(eventLoop != null) - require(monitor != null) - require(worker != null) - - private var statusCode: Int = 0 - private val latch = new CountDownLatch(1) - - eventLoop.registerListener(new KylinJobListener { - override def onReceive(event: KylinJobEvent): Unit = { - event match { - case _: JobSucceeded => success() - case jf: JobFailed => fail(jf) - case _ => - } - } - }) - - def run(): Int = { - eventLoop.start() - eventLoop.post(RunJob()) - latch.await() - statusCode - } - - def success(): Unit = { - try { - stop() - } finally { - statusCode = 0 - latch.countDown() - } - } - - def fail(jf: JobFailed): Unit = { - try { - logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable) - KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new util.HashMap, jf.throwable)) - updateJobErrorInfo(jf) - stop() - } finally { - statusCode = 1 - latch.countDown() - } - } - - def stop(): Unit = { - monitor.stop() - worker.stop() - eventLoop.stop() - } - - def updateJobErrorInfo(jf: JobFailed): Unit = { - val infos = KylinBuildEnv.get().buildJobInfos - val context = worker.getApplication - - val project = context.getProject - val jobId = context.getJobId - - val stageId = infos.getStageId - val jobStepId = StringUtils.replace(infos.getJobStepId, SparkApplication.JOB_NAME_PREFIX, "") - val failedStepId = if (StringUtils.isBlank(stageId)) jobStepId else stageId - - val failedSegmentId = infos.getSegmentId - val failedStack = ExceptionUtils.getStackTrace(jf.throwable) - val failedReason = - if (context.getAtomicUnreachableSparkMaster.get()) "Unable connect spark master to reach timeout maximum time" - else jf.reason - val url = "/kylin/api/jobs/error" - - val payload: util.HashMap[String, Object] = new util.HashMap[String, Object](5) - payload.put("project", project) - payload.put("job_id", jobId) - payload.put("failed_step_id", failedStepId) - payload.put("failed_segment_id", failedSegmentId) - payload.put("failed_stack", failedStack) - payload.put("failed_reason", failedReason) - val json = JsonUtil.writeValueAsString(payload) - val params = new util.HashMap[String, String]() - val config = KylinBuildEnv.get().kylinConfig - params.put(ParamsConstants.TIME_OUT, config.getUpdateJobInfoTimeout.toString) - params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true)) - context.getReport.updateSparkJobInfo(params, url, json); - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala deleted file mode 100644 index 069ac171cc..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.application - -import java.util.concurrent.Executors - -import org.apache.kylin.engine.spark.application.SparkApplication -import org.apache.kylin.engine.spark.scheduler._ -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.KylinJobEventLoop - -class JobWorker(application: SparkApplication, args: Array[String], eventLoop: KylinJobEventLoop) extends Logging { - private val pool = Executors.newSingleThreadExecutor() - - def getApplication: SparkApplication = application - - eventLoop.registerListener(new KylinJobListener { - override def onReceive(event: KylinJobEvent): Unit = { - event match { - case _: RunJob => runJob() - case _ => - } - } - }) - - def stop(): Unit = { - pool.shutdownNow() - application.logJobInfo() - } - - private def runJob(): Unit = { - execute() - } - - - private def execute(): Unit = { - pool.execute(new Runnable { - override def run(): Unit = { - try { - application.execute(args) - eventLoop.post(JobSucceeded()) - } catch { - case exception: NoRetryException => eventLoop.post(UnknownThrowable(exception)) - case throwable: Throwable => eventLoop.post(ResourceLack(throwable)) - } - } - }) - } -} - -class NoRetryException(msg: String) extends java.lang.Exception(msg) { - def this() { - this(null) - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala deleted file mode 100644 index 5cefd24054..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.master - -import java.util - -import org.apache.kylin.cluster.{AvailableResource, IClusterManager, ResourceInfo} -import org.apache.kylin.common.KylinConfig -import org.apache.spark.deploy.DeployMessages.{KillApplication, MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.master.StandaloneClusterManager.masterEndpoints -import org.apache.spark.internal.Logging -import org.apache.spark.rpc.{RpcAddress, RpcEnv} -import org.apache.spark.sql.SparkSession -import org.apache.spark.util.Utils -import org.apache.spark.{SecurityManager, SparkConf} - -// scalastyle:off -class StandaloneClusterManager extends IClusterManager with Logging { - - private val JOB_STEP_PREFIX = "job_step_" - - override def fetchMaximumResourceAllocation: ResourceInfo = { - val state = masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState) - val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE) - val availableMem = aliveWorkers.map(_.memoryFree).sum - val availableCores = aliveWorkers.map(_.coresFree).sum - logInfo(s"Get available resource, " + - s"availableMem: $availableMem, availableCores: $availableCores") - ResourceInfo(availableMem, availableCores) - } - - override def fetchQueueAvailableResource(queueName: String): AvailableResource = { - val state = masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState) - val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE) - val availableMem = aliveWorkers.map(_.memoryFree).sum - val availableCores = aliveWorkers.map(_.coresFree).sum - val totalMem = aliveWorkers.map(_.memory).sum - val totalCores = aliveWorkers.map(_.cores).sum - logInfo(s"Get available resource, " + - s"availableMem: $availableMem, availableCores: $availableCores, " + - s"totalMem: $totalMem, totalCores: $totalCores") - AvailableResource(ResourceInfo(availableMem, availableCores), ResourceInfo(totalMem, totalCores)) - } - - override def getBuildTrackingUrl(sparkSession: SparkSession): String = { - val applicationId = sparkSession.sparkContext.applicationId - logInfo(s"Get tracking url of application $applicationId") - val state = masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState) - val app = state.activeApps.find(_.id == applicationId).orNull - if (app == null) { - logInfo(s"No active application found of applicationId $applicationId") - return null - } - app.desc.appUiUrl - } - - override def killApplication(jobStepId: String): Unit = { - killApplication(s"$JOB_STEP_PREFIX", jobStepId) - } - - override def killApplication(jobStepPrefix: String, jobStepId: String): Unit = { - val master = masterEndpoints(0) - val state = master.askSync[MasterStateResponse](RequestMasterState) - val app = state.activeApps.find(_.desc.name.equals(jobStepPrefix + jobStepId)).orNull - if (app == null) { - logInfo(s"No active application found of jobStepId $jobStepId") - return - } - logInfo(s"Kill application ${app.id} by jobStepId $jobStepId") - master.send(KillApplication(app.id)) - } - - override def getRunningJobs(queues: util.Set[String]): util.List[String] = { - val state = masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState) - val jobStepNames = state.activeApps.map(_.desc.name) - logInfo(s"Get running jobs ${jobStepNames.toSeq}") - import scala.collection.JavaConverters._ - jobStepNames.toList.asJava - } - - override def fetchQueueStatistics(queueName: String): ResourceInfo = { - fetchMaximumResourceAllocation - } - - override def isApplicationBeenKilled(jobStepId: String): Boolean = { - val master = masterEndpoints(0) - val state = master.askSync[MasterStateResponse](RequestMasterState) - val app = state.completedApps.find(_.desc.name.equals(s"$JOB_STEP_PREFIX$jobStepId")).orNull - if (app == null) { - false - } else { - "KILLED".equals(app.state.toString) - } - } - - override def applicationExisted(jobId: String): Boolean = { - val master = masterEndpoints(0) - val state = master.askSync[MasterStateResponse](RequestMasterState) - val app = state.activeApps.find(_.desc.name.equalsIgnoreCase(jobId)).orNull - if (app == null) { - false - } else { - true - } - } - -} - -object StandaloneClusterManager extends Logging { - - private val ENDPOINT_NAME = "Master" - private val CLIENT_NAME = "kylinStandaloneClient" - private val SPARK_LOCAL = "local" - private val SPARK_MASTER = "spark.master" - private val SPARK_RPC_TIMEOUT = "spark.rpc.askTimeout" - private val SPARK_AUTHENTICATE = "spark.authenticate" - private val SPARK_AUTHENTICATE_SECRET = "spark.authenticate.secret" - private val SPARK_NETWORK_CRYPTO_ENABLED = "spark.network.crypto.enabled" - - private lazy val masterEndpoints = { - val overrideConfig = KylinConfig.getInstanceFromEnv.getSparkConfigOverride - val conf = new SparkConf() - if (!conf.contains(SPARK_MASTER) || conf.get(SPARK_MASTER).startsWith(SPARK_LOCAL)) { - conf.set(SPARK_MASTER, overrideConfig.get(SPARK_MASTER)) - } - if (!conf.contains(SPARK_RPC_TIMEOUT)) { - conf.set(SPARK_RPC_TIMEOUT, "10s") - } - if (overrideConfig.containsKey(SPARK_AUTHENTICATE) && "true".equals(overrideConfig.get(SPARK_AUTHENTICATE))) { - conf.set(SPARK_AUTHENTICATE, "true") - conf.set(SPARK_AUTHENTICATE_SECRET, overrideConfig.getOrDefault(SPARK_AUTHENTICATE_SECRET, "kylin")) - conf.set(SPARK_NETWORK_CRYPTO_ENABLED, overrideConfig.getOrDefault(SPARK_NETWORK_CRYPTO_ENABLED, "true")) - } - logInfo(s"Spark master ${conf.get(SPARK_MASTER)}") - val rpcEnv = RpcEnv.create(CLIENT_NAME, Utils.localHostName(), 0, conf, new SecurityManager(conf)) - val masterUrls = conf.get(SPARK_MASTER) - Utils.parseStandaloneMasterUrls(masterUrls) - .map(RpcAddress.fromSparkURL) - .map(rpcEnv.setupEndpointRef(_, ENDPOINT_NAME)) - } -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala deleted file mode 100644 index 38f1554a01..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import java.util -import java.util.{List => JList} - -import org.apache.kylin.engine.spark.scheduler.{KylinJobEvent, KylinJobListener} -import org.apache.spark.util.EventLoop - -import scala.collection.JavaConverters._ - -class KylinJobEventLoop extends EventLoop[KylinJobEvent]("spark-entry-event-loop") { - // register listener in single thread, so LinkedList is enough - private var listeners: JList[KylinJobListener] = new util.LinkedList[KylinJobListener]() - - def registerListener(listener: KylinJobListener): Boolean = { - listeners.add(listener) - } - - def unregisterListener(listener: KylinJobListener): Boolean = { - listeners.remove(listener) - } - - override protected def onReceive(event: KylinJobEvent): Unit = { - listeners.asScala.foreach(_.onReceive(event)) - } - - override protected def onError(e: Throwable): Unit = { - - } -}