This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new d8974e8525 KYLIN-5217 Create a initial commit d8974e8525 is described below commit d8974e85252f7dd6732293beb6387d277df56057 Author: longfei.jiang <longfei.ji...@kyligence.io> AuthorDate: Tue Aug 9 14:22:39 2022 +0800 KYLIN-5217 Create a initial commit --- .../kylin/cluster/ClusterManagerFactory.scala | 37 ++++ .../apache/kylin/cluster/K8sClusterManager.scala | 65 +++++++ .../kylin/cluster/SchedulerInfoCmdHelper.scala | 167 +++++++++++++++++ .../kylin/cluster/StandaloneClusterManager.scala | 22 +++ .../apache/kylin/cluster/YarnClusterManager.scala | 206 +++++++++++++++++++++ .../cluster/parser/CapacitySchedulerParser.scala | 92 +++++++++ .../kylin/cluster/parser/FairSchedulerParser.scala | 64 +++++++ .../kylin/cluster/parser/SchedulerParser.scala | 43 +++++ .../cluster/parser/SchedulerParserFactory.scala | 43 +++++ .../engine/spark/scheduler/ClusterMonitor.scala | 82 ++++++++ .../kylin/engine/spark/scheduler/JobRuntime.scala | 59 ++++++ .../engine/spark/scheduler/KylinJobEvent.scala | 44 +++++ .../engine/spark/scheduler/KylinJobListener.scala | 23 +++ .../org/apache/spark/application/JobMonitor.scala | 94 ++++++++++ .../apache/spark/application/JobWorkSpace.scala | 153 +++++++++++++++ .../org/apache/spark/application/JobWorker.scala | 71 +++++++ .../deploy/master/StandaloneClusterManager.scala | 157 ++++++++++++++++ .../apache/spark/scheduler/KylinJobEventLoop.scala | 48 +++++ 18 files changed, 1470 insertions(+) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala new file mode 100644 index 0000000000..6009232f57 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/ClusterManagerFactory.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cluster + +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import io.kyligence.kap.guava20.shaded.common.util.concurrent.SimpleTimeLimiter +import org.apache.kylin.common.KylinConfig +import org.apache.spark.util.KylinReflectUtils + + +object ClusterManagerFactory { + + val pool: ExecutorService = Executors.newCachedThreadPool + + def create(kylinConfig: KylinConfig): IClusterManager = { + val timeLimiter = SimpleTimeLimiter.create(pool) + val target = KylinReflectUtils.createObject(kylinConfig.getClusterManagerClassName)._1.asInstanceOf[IClusterManager] + + timeLimiter.newProxy(target, classOf[IClusterManager], kylinConfig.getClusterManagerTimeoutThreshold, TimeUnit.SECONDS); + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala new file mode 100644 index 0000000000..8711622885 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/K8sClusterManager.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.cluster + +import java.util + +import com.google.common.collect.Lists +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +class K8sClusterManager extends IClusterManager with Logging { + override def fetchMaximumResourceAllocation: ResourceInfo = { + ResourceInfo(Int.MaxValue, 1000) + } + + override def fetchQueueAvailableResource(queueName: String): AvailableResource = { + AvailableResource(ResourceInfo(Int.MaxValue, 1000), ResourceInfo(Int.MaxValue, 1000)) + } + + override def getBuildTrackingUrl(sparkSession: SparkSession): String = { + logInfo("Get Build Tracking Url!") + val applicationId = sparkSession.sparkContext.applicationId + "" + } + + override def killApplication(jobStepId: String): Unit = { + logInfo("Kill Application $jobStepId !") + } + + override def killApplication(jobStepPrefix: String, jobStepId: String): Unit = { + logInfo("Kill Application $jobStepPrefix $jobStepId !") + } + + override def isApplicationBeenKilled(applicationId: String): Boolean = { + true + } + + override def getRunningJobs(queues: util.Set[String]): util.List[String] = { + Lists.newArrayList() + } + + override def fetchQueueStatistics(queueName: String): ResourceInfo = { + ResourceInfo(Int.MaxValue, 1000) + } + + override def applicationExisted(jobId: String): Boolean = { + false + } + +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala new file mode 100644 index 0000000000..95c35a8a75 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/SchedulerInfoCmdHelper.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cluster + +import org.apache.kylin.engine.spark.utils.StorageUtils +import io.netty.util.internal.ThrowableUtil +import org.apache.hadoop.yarn.conf.{HAUtil, YarnConfiguration} +import org.apache.kylin.common.util.{JsonUtil, ShellException} +import org.apache.spark.internal.Logging + +import java.io.{BufferedReader, InputStreamReader} +import java.nio.charset.StandardCharsets + +object SchedulerInfoCmdHelper extends Logging { + private val useHttps: Boolean = YarnConfiguration.useHttps(StorageUtils.getCurrentYarnConfiguration) + + def schedulerInfo: String = { + val cmds = getSocketAddress.map(address => genCmd(address._1, address._2)) + getInfoByCmds(cmds) + } + + def metricsInfo: String = { + val cmds = getSocketAddress.map(address => genMetricsCmd(address._1, address._2)) + getInfoByCmds(cmds) + } + + private[cluster] def getInfoByCmds(cmds: Iterable[String]): String = { + val results = cmds.map { cmd => + try { + logInfo(s"Executing the command: ${cmd}") + execute(cmd) + } catch { + case throwable: Throwable => (1, ThrowableUtil.stackTraceToString(throwable)) + } + } + val tuples = results.filter(result => result._1 == 0 && JsonUtil.isJson(result._2)) + if (tuples.isEmpty) { + val errors = tuples.map(_._2).mkString("\n") + logWarning(s"Error occurred when get scheduler info from cmd $cmds") + throw new RuntimeException(errors) + } else { + require(tuples.size == 1) + tuples.head._2 + } + } + + private[cluster] def getSocketAddress: Map[String, Int] = { + val conf = StorageUtils.getCurrentYarnConfiguration + val addresses = if (HAUtil.isHAEnabled(conf)) { + val haIds = HAUtil.getRMHAIds(conf).toArray + require(haIds.nonEmpty, "Ha ids is empty, please check your yarn-site.xml.") + if (useHttps) { + haIds.map(id => conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS}.$id", + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT)) + } else { + haIds.map(id => conf.getSocketAddr(s"${YarnConfiguration.RM_WEBAPP_ADDRESS}.$id", + YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_PORT)) + } + } else { + if (useHttps) { + Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_HTTPS_PORT)) + } else { + Array(conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_PORT)) + } + } + addresses.map(address => address.getHostName -> address.getPort).toMap + } + + private[cluster] def genCmd(hostName: String, port: Int): String = { + val uri = if (useHttps) { + s"https://$hostName:$port/ws/v1/cluster/scheduler" + } else { + s"http://$hostName:$port/ws/v1/cluster/scheduler" + } + s"""curl -k --negotiate -u : "$uri"""" + } + + private[cluster] def genMetricsCmd(hostName: String, port: Int): String = { + val uri = if (useHttps) { + s"https://$hostName:$port/ws/v1/cluster/metrics" + } else { + s"http://$hostName:$port/ws/v1/cluster/metrics" + } + s"""curl -k --negotiate -u : "$uri"""" + } + + /** + * only return std out after execute command + * + * @param command + * @return + */ + private[cluster] def execute(command: String): (Int, String) = { + try { + val cmd = new Array[String](3) + val osName = System.getProperty("os.name") + if (osName.startsWith("Windows")) { + cmd(0) = "cmd.exe" + cmd(1) = "/C" + } + else { + cmd(0) = "/bin/bash" + cmd(1) = "-c" + } + cmd(2) = command + val builder = new ProcessBuilder(cmd: _*) + builder.environment().putAll(System.getenv()) + val proc = builder.start + val resultStdout = new StringBuilder + val inReader = new BufferedReader(new InputStreamReader(proc.getInputStream, StandardCharsets.UTF_8.name())) + val newLine = System.getProperty("line.separator") + var line: String = inReader.readLine() + while (line != null) { + resultStdout.append(line).append(newLine) + line = inReader.readLine() + } + + val stderr = new StringBuilder + val errorReader = new BufferedReader(new InputStreamReader(proc.getErrorStream, StandardCharsets.UTF_8.name())) + line = errorReader.readLine() + while (line != null) { + stderr.append(line).append(newLine) + line = errorReader.readLine() + } + + logInfo(s"The corresponding http response for the above command: \n ${stderr}") + try { + val exitCode = proc.waitFor + if (exitCode != 0) { + logError(s"executing command $command; exit code: $exitCode") + logError(s"==========================[stderr]===============================") + logError(stderr.toString) + logError(s"==========================[stderr]===============================") + + logError(s"==========================[stdout]===============================") + logError(resultStdout.toString) + logError(s"==========================[stdout]===============================") + } + (exitCode, resultStdout.toString) + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw e + } + } catch { + case e: Exception => throw new ShellException(e) + } + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala new file mode 100644 index 0000000000..c3737a9090 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/StandaloneClusterManager.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cluster + +class StandaloneClusterManager extends org.apache.spark.deploy.master.StandaloneClusterManager { +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala new file mode 100644 index 0000000000..c3f6275a0a --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/YarnClusterManager.scala @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cluster + +import java.io.IOException +import java.util +import java.util.stream.Collectors +import java.util.{ArrayList, EnumSet, List, Set} + +import com.google.common.collect.{Lists, Sets} +import org.apache.kylin.cluster.parser.SchedulerParserFactory +import org.apache.kylin.engine.spark.utils.StorageUtils +import org.apache.commons.collections.CollectionUtils +import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, QueueInfo, YarnApplicationState} +import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.YarnException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +import scala.collection.JavaConverters._ + +class YarnClusterManager extends IClusterManager with Logging { + + import org.apache.kylin.cluster.YarnClusterManager._ + + private lazy val maximumResourceAllocation: ResourceInfo = { + withYarnClient(yarnClient => { + try { + val response = yarnClient.createApplication().getNewApplicationResponse + val cap = response.getMaximumResourceCapability + val resourceInfo = ResourceInfo(cap.getMemory, cap.getVirtualCores) + logInfo(s"Cluster maximum resource allocation $resourceInfo") + resourceInfo + } catch { + case throwable: Throwable => + logError("Error occurred when get resource upper limit per container.", throwable) + throw throwable + } + }) + } + + override def fetchMaximumResourceAllocation: ResourceInfo = maximumResourceAllocation + + override def fetchQueueAvailableResource(queueName: String): AvailableResource = { + val info = SchedulerInfoCmdHelper.schedulerInfo + val parser = SchedulerParserFactory.create(info) + parser.parse(info) + parser.availableResource(queueName) + } + + def getYarnApplicationReport(applicationId: String): ApplicationReport = { + withYarnClient(yarnClient => { + val array = applicationId.split("_") + if (array.length < 3) return null + val appId = ApplicationId.newInstance(array(1).toLong, array(2).toInt) + yarnClient.getApplicationReport(appId) + }) + } + + override def getBuildTrackingUrl(sparkSession: SparkSession): String = { + val applicationId = sparkSession.sparkContext.applicationId + val applicationReport = getYarnApplicationReport(applicationId) + if (null == applicationReport) null + else applicationReport.getTrackingUrl + } + + def killApplication(jobStepId: String): Unit = { + killApplication("job_step_", jobStepId) + } + + def killApplication(jobStepPrefix: String, jobStepId: String): Unit = { + withYarnClient(yarnClient => { + var orphanApplicationId: String = null + try { + val types: Set[String] = Sets.newHashSet("SPARK") + val states: EnumSet[YarnApplicationState] = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, + YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) + val applicationReports: List[ApplicationReport] = yarnClient.getApplications(types, states) + if (CollectionUtils.isEmpty(applicationReports)) return + import scala.collection.JavaConverters._ + for (report <- applicationReports.asScala) { + if (report.getName.equalsIgnoreCase(jobStepPrefix + jobStepId)) { + orphanApplicationId = report.getApplicationId.toString + yarnClient.killApplication(report.getApplicationId) + logInfo(s"kill orphan yarn application $orphanApplicationId succeed, job step $jobStepId") + } + } + } catch { + case ex@(_: YarnException | _: IOException) => + logError(s"kill orphan yarn application $orphanApplicationId failed, job step $jobStepId", ex) + } + }) + } + + def getRunningJobs(queues: Set[String]): List[String] = { + withYarnClient(yarnClient => { + if (queues.isEmpty) { + val applications = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)) + if (null == applications) new ArrayList[String]() + else applications.asScala.map(_.getName).asJava + } else { + val runningJobs: List[String] = new ArrayList[String]() + for (queue <- queues.asScala) { + val applications = yarnClient.getQueueInfo(queue).getApplications + if (null != applications) { + applications.asScala.filter(_.getYarnApplicationState == YarnApplicationState.RUNNING).map(_.getName).map(runningJobs.add) + } + } + runningJobs + } + }) + } + + override def fetchQueueStatistics(queueName: String): ResourceInfo = { + withYarnClient(yarnClient => { + val qs = yarnClient.getQueueInfo(queueName).getQueueStatistics + ResourceInfo(qs.getAvailableMemoryMB.toInt, qs.getAvailableVCores.toInt) + }) + } + + override def isApplicationBeenKilled(jobStepId: String): Boolean = { + withYarnClient(yarnClient => { + val types: Set[String] = Sets.newHashSet("SPARK") + val states: EnumSet[YarnApplicationState] = EnumSet.of(YarnApplicationState.KILLED) + val applicationReports: List[ApplicationReport] = yarnClient.getApplications(types, states) + if (!CollectionUtils.isEmpty(applicationReports)) { + import scala.collection.JavaConverters._ + for (report <- applicationReports.asScala) { + if (report.getName.equalsIgnoreCase("job_step_" + jobStepId)) { + return true + } + } + } + false + }) + } + + def applicationExisted(jobId: String): Boolean = { + withYarnClient(yarnClient => { + val types: util.Set[String] = Sets.newHashSet("SPARK") + val states: util.EnumSet[YarnApplicationState] = util.EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, + YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING) + val applicationReports: util.List[ApplicationReport] = yarnClient.getApplications(types, states) + if (!CollectionUtils.isEmpty(applicationReports)) { + import scala.collection.JavaConverters._ + for (report <- applicationReports.asScala) { + if (report.getName.equalsIgnoreCase(jobId)) { + return true + } + } + } + return false + }) + } + + def listQueueNames(): util.List[String] = { + withYarnClient(yarnClient => { + val queues: util.List[QueueInfo] = yarnClient.getAllQueues + val queueNames: util.List[String] = Lists.newArrayList() + for (queue <- queues.asScala) { + queueNames.add(queue.getQueueName) + } + queueNames + }) + } + +} + +object YarnClusterManager { + + def withYarnClient[T](body: YarnClient => T): T = { + val yarnClient = YarnClient.createYarnClient + yarnClient.init(getSpecifiedConf) + yarnClient.start() + try { + body(yarnClient) + } finally { + yarnClient.close() + } + } + + private def getSpecifiedConf: YarnConfiguration = { + // yarn cluster mode couldn't access local hadoop_conf_dir + val yarnConf = StorageUtils.getCurrentYarnConfiguration + // https://issues.apache.org/jira/browse/SPARK-15343 + yarnConf.set("yarn.timeline-service.enabled", "false") + yarnConf + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala new file mode 100644 index 0000000000..e22700c7e7 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/CapacitySchedulerParser.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cluster.parser + +import java.util.{List => JList} +import com.fasterxml.jackson.databind.JsonNode +import org.apache.kylin.cluster.{AvailableResource, ResourceInfo} +import org.apache.kylin.engine.spark.application.SparkApplication +import org.apache.kylin.engine.spark.job.KylinBuildEnv + +import scala.collection.JavaConverters._ + +class CapacitySchedulerParser extends SchedulerParser { + + override def availableResource(queueName: String): AvailableResource = { + val queues: JList[JsonNode] = root.findParents("queueName") + val nodes = queues.asScala.filter(queue => parseValue(queue.get("queueName")).equals(queueName)) + require(nodes.size == 1) + + var (queueAvailable, queueMax) = queueCapacity(nodes.head) + val totalResource = calTotalResource(nodes.head) + val clusterNode = root.findValue("schedulerInfo") + val cluster = clusterAvailableCapacity(clusterNode) + var min = Math.min(queueAvailable, cluster) + logInfo(s"queueAvailable is ${queueAvailable}, min is ${min}, queueMax is ${queueMax}") + if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && queueMax == 0.0) { + logInfo("configure yarn queue using dynamic resource plan in capacity scheduler") + queueMax = 1.0 + } + if (KylinBuildEnv.get().kylinConfig.useDynamicResourcePlan() && min == 0.0) { + logInfo("configure yarn queue using dynamic resource plan in capacity scheduler") + min = 1.0 + } + var resource = AvailableResource(totalResource.percentage(min), totalResource.percentage(queueMax)) + try { + val queueAvailableRes = KylinBuildEnv.get.clusterManager.fetchQueueStatistics(queueName) + resource = AvailableResource(queueAvailableRes, totalResource.percentage(queueMax)) + } catch { + case e: Error => + logInfo(s"The current hadoop version does not support QueueInfo.getQueueStatistics method.") + None + } + + logInfo(s"Capacity actual available resource: $resource.") + resource + } + + private def clusterAvailableCapacity(node: JsonNode): Double = { + val max = parseValue(node.get("capacity")).toDouble + val used = parseValue(node.get("usedCapacity")).toDouble + val capacity = (max - used) / 100 + logInfo(s"Cluster available capacity: $capacity.") + capacity + } + + private def queueCapacity(node: JsonNode): (Double, Double) = { + val max = parseValue(node.get("absoluteMaxCapacity")).toDouble + val used = parseValue(node.get("absoluteUsedCapacity")).toDouble + val available = (max - used) / 100 + logInfo(s"Queue available capacity: $available.") + (available, max / 100) + } + + private def calTotalResource(node: JsonNode): ResourceInfo = { + val usedMemory = parseValue(node.get("resourcesUsed").get("memory")).toInt + if (usedMemory != 0) { + val usedCapacity = parseValue(node.get("absoluteUsedCapacity")).toDouble / 100 + val resource = ResourceInfo(Math.floor(usedMemory / usedCapacity).toInt, Int.MaxValue) + logInfo(s"Estimate total cluster resource is $resource.") + resource + } else { + logInfo("Current queue used memory is 0, seem available resource as infinite.") + ResourceInfo(Int.MaxValue, Int.MaxValue) + } + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala new file mode 100644 index 0000000000..f2c9125191 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/FairSchedulerParser.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cluster.parser + +import java.util.{List => JList} + +import com.fasterxml.jackson.databind.JsonNode +import org.apache.kylin.cluster.{AvailableResource, ResourceInfo} + +import scala.collection.JavaConverters._ + +class FairSchedulerParser extends SchedulerParser { + + override def availableResource(queueName: String): AvailableResource = { + val cluster = clusterAvailableResource() + val queue = queueAvailableResource(queueName) + val resource = AvailableResource(cluster.available.reduceMin(queue.available), queue.max) + logInfo(s"Current actual available resource: $resource.") + resource + } + + private def queueAvailableResource(queueName: String): AvailableResource = { + val queues: JList[JsonNode] = root.findParents("queueName") + val nodes = queues.asScala.filter(queue => parseValue(queue.get("queueName")).equals(queueName)) + require(nodes.size == 1) + val resource = calAvailableResource(nodes.head) + logInfo(s"Queue available resource: $resource.") + resource + } + + private def clusterAvailableResource(): AvailableResource = { + val node = root.findValue("rootQueue") + require(node != null) + val resource = calAvailableResource(node) + logInfo(s"Cluster available resource: $resource.") + resource + } + + private def calAvailableResource(node: JsonNode): AvailableResource = { + val maxResources = node.get("maxResources") + val maxMemory = parseValue(maxResources.get("memory")).toInt + val maxCores = parseValue(maxResources.get("vCores")).toInt + val usedResources = node.get("usedResources") + val usedMemory = parseValue(usedResources.get("memory")).toInt + val usedCores = parseValue(usedResources.get("vCores")).toInt + AvailableResource(ResourceInfo(maxMemory - usedMemory, maxCores - usedCores), ResourceInfo(maxMemory, maxCores)) + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala new file mode 100644 index 0000000000..9db043abc7 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParser.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cluster.parser + +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} +import org.apache.kylin.cluster.AvailableResource +import org.apache.spark.internal.Logging + +trait SchedulerParser extends Logging { + protected var root: JsonNode = _ + protected lazy val mapper = new ObjectMapper + + def availableResource(queueName: String): AvailableResource + + def parse(schedulerInfo: String): Unit = { + this.root = mapper.readTree(schedulerInfo) + } + + // value in some scheduler info format to "value" + protected def parseValue(node: JsonNode): String = { + if (node.toString.startsWith("\"")) { + node.toString.replace("\"", "") + } else { + node.toString + } + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala new file mode 100644 index 0000000000..315ae3155d --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/cluster/parser/SchedulerParserFactory.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cluster.parser + +import org.apache.kylin.common.util.JsonUtil +import org.apache.spark.internal.Logging + +object SchedulerParserFactory extends Logging{ + def create(info: String): SchedulerParser = { + try { + val schedulerType = JsonUtil.readValueAsTree(info).findValue("type").toString + // call contains rather than equals cause of value is surrounded with ", for example: {"type":"fairScheduler"} + // do not support FifoScheduler for now + if (schedulerType.contains("capacityScheduler")) { + new CapacitySchedulerParser + } else if (schedulerType.contains("fairScheduler")) { + new FairSchedulerParser + } else { + throw new IllegalArgumentException(s"Unsupported scheduler type from scheduler info. $schedulerType") + } + } catch { + case throwable: Throwable => + logError(s"Invalid scheduler info. $info") + throw throwable + } + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala new file mode 100644 index 0000000000..d9d80f4298 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/ClusterMonitor.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.scheduler + +import org.apache.kylin.common.KapConfig +import org.apache.kylin.engine.spark.job.KylinBuildEnv +import org.apache.kylin.engine.spark.utils.ThreadUtils +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} + + +class ClusterMonitor extends Logging { + private lazy val scheduler = // + ThreadUtils.newDaemonSingleThreadScheduledExecutor("connect-master-guard") + private val JOB_STEP_PREFIX = "job_step_" + + def scheduleAtFixedRate(func: () => Unit, period: Long): Unit = { + scheduler.scheduleAtFixedRate(new Runnable { + override def run(): Unit = func.apply() + }, period, period, TimeUnit.SECONDS) + } + + def monitorSparkMaster(atomicBuildEnv: AtomicReference[KylinBuildEnv], atomicSparkSession: AtomicReference[SparkSession], + disconnectTimes: AtomicLong, atomicUnreachableSparkMaster: AtomicBoolean): Unit = { + val config = atomicBuildEnv.get().kylinConfig + if (KapConfig.wrap(config).isCloud && !config.isUTEnv) { + val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes + if (disconnectMaxTimes >= 0) { + val connectPeriod = config.getClusterManagerHealCheckIntervalSecond + logDebug(s"ClusterMonitor thread start with max times is $disconnectMaxTimes period is $connectPeriod") + scheduleAtFixedRate(() => { + monitor(atomicBuildEnv, atomicSparkSession, disconnectTimes, atomicUnreachableSparkMaster) + }, connectPeriod) + } + } + } + + def monitor(atomicBuildEnv: AtomicReference[KylinBuildEnv], atomicSparkSession: AtomicReference[SparkSession], + disconnectTimes: AtomicLong, atomicUnreachableSparkMaster: AtomicBoolean): Unit = { + logDebug("monitor start") + val config = atomicBuildEnv.get().kylinConfig + val disconnectMaxTimes = config.getClusterManagerHealthCheckMaxTimes + try { + val clusterManager = atomicBuildEnv.get.clusterManager + clusterManager.applicationExisted(JOB_STEP_PREFIX + atomicBuildEnv.get().buildJobInfos.getJobStepId) + disconnectTimes.set(0) + atomicUnreachableSparkMaster.set(false) + logDebug("monitor stop") + } catch { + case e: Exception => + logError(s"monitor error with : ${e.getMessage}") + if (disconnectTimes.incrementAndGet() >= disconnectMaxTimes && atomicSparkSession.get() != null) { + logDebug(s"Job will stop: Unable connect spark master to reach timeout maximum time") + atomicUnreachableSparkMaster.set(true) + atomicSparkSession.get().stop() + } + } + } + + def shutdown(): Unit = { + scheduler.shutdownNow() + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala new file mode 100644 index 0000000000..ee72d16e51 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/JobRuntime.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.scheduler + +import java.util.concurrent.TimeUnit + +import org.apache.kylin.engine.spark.utils.ThreadUtils + +class JobRuntime(val maxThreadCount: Int) { + + private lazy val minThreads = 1 + private lazy val maxThreads = Math.max(minThreads, maxThreadCount) + // Maybe we should parameterize nThreads. + private lazy val threadPool = // + ThreadUtils.newDaemonScalableThreadPool("build-thread", // + minThreads, maxThreads, 20, TimeUnit.SECONDS) + + // Drain layout result using single thread. + private lazy val scheduler = // + ThreadUtils.newDaemonSingleThreadScheduledExecutor("build-scheduler") + + def submit(fun: () => Unit): Unit = { + threadPool.submit(new Runnable { + override def run(): Unit = fun.apply() + }) + } + + def scheduleCheckpoint(fun: () => Unit): Unit = { + scheduler.scheduleWithFixedDelay(() => fun.apply(), 30L, 30L, TimeUnit.SECONDS) + } + + def schedule(func: () => Unit, delay: Long, unit: TimeUnit): Unit = { + scheduler.schedule(new Runnable { + override def run(): Unit = func.apply() + }, delay, unit) + } + + def shutdown(): Unit = { + scheduler.shutdownNow() + threadPool.shutdownNow() + } + +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala new file mode 100644 index 0000000000..d0de86c938 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobEvent.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.scheduler + +sealed trait KylinJobEvent + +sealed trait JobStatus extends KylinJobEvent + +case class JobSucceeded() extends JobStatus + +case class JobFailed(reason: String, throwable: Throwable) extends JobStatus + +sealed trait JobEndReason extends KylinJobEvent + +sealed trait JobFailedReason extends JobEndReason + +case class ResourceLack(throwable: Throwable) extends JobFailedReason + +case class ExceedMaxRetry(throwable: Throwable) extends JobFailedReason + +case class UnknownThrowable(throwable: Throwable) extends JobFailedReason + + +sealed trait JobCommand extends KylinJobEvent + +case class RunJob() extends JobCommand + + diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala new file mode 100644 index 0000000000..686a183c32 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/scheduler/KylinJobListener.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.scheduler + +trait KylinJobListener { + def onReceive(event: KylinJobEvent): Unit +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala new file mode 100644 index 0000000000..98a5d7e5da --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.application + +import org.apache.kylin.engine.spark.job.KylinBuildEnv +import org.apache.kylin.engine.spark.scheduler._ +import io.netty.util.internal.ThrowableUtil +import org.apache.kylin.common.util.Unsafe +import org.apache.spark.autoheal.ExceptionTerminator +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.KylinJobEventLoop + +class JobMonitor(eventLoop: KylinJobEventLoop) extends Logging { + var retryTimes = 0 + eventLoop.registerListener(new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + event match { + case rl: ResourceLack => handleResourceLack(rl) + case ut: UnknownThrowable => handleUnknownThrowable(ut) + case emr: ExceedMaxRetry => handleExceedMaxRetry(emr) + case _ => + } + } + }) + + def stop(): Unit = { + } + + def handleResourceLack(rl: ResourceLack): Unit = { + try { + if (rl.throwable != null && rl.throwable.getCause != null) { + if (rl.throwable.getCause.isInstanceOf[NoRetryException]) { + eventLoop.post(JobFailed(rl.throwable.getCause.getMessage, rl.throwable.getCause)) + return + } + } + + logInfo(s"handleResourceLack --> ${rl.throwable.getCause}") + val buildEnv = KylinBuildEnv.get() + // if killed, job failed without retry + val jobStepId = buildEnv.buildJobInfos.getJobStepId + if (buildEnv.clusterManager.isApplicationBeenKilled(jobStepId)) { + eventLoop.post(JobFailed(s"Submitted application $jobStepId has been killed.", rl.throwable)) + return + } + retryTimes += 1 + KylinBuildEnv.get().buildJobInfos.recordRetryTimes(retryTimes) + val maxRetry = buildEnv.kylinConfig.getSparkEngineMaxRetryTime + if (retryTimes <= maxRetry) { + logError(s"Job failed $retryTimes time(s). Cause:", rl.throwable) + Unsafe.setProperty("kylin.spark-conf.auto-prior", "false") + ExceptionTerminator.resolveException(rl, eventLoop) + } else { + eventLoop.post(ExceedMaxRetry(rl.throwable)) + } + } catch { + case throwable: Throwable => eventLoop.post(JobFailed("Error occurred when generate retry configuration.", throwable)) + } + } + + + def handleExceedMaxRetry(emr: ExceedMaxRetry): Unit = { + eventLoop.post(JobFailed("Retry times exceed MaxRetry set in the KylinConfig.", emr.throwable)) + } + + def handleUnknownThrowable(ur: UnknownThrowable): Unit = { + eventLoop.post(JobFailed("Unknown error occurred during the job.", ur.throwable)) + } +} + +case class RetryInfo(overrideConf: java.util.Map[String, String], throwable: Throwable) { + override def toString: String = { + s"""RetryInfo{ + | overrideConf : $overrideConf, + | throwable : ${ThrowableUtil.stackTraceToString(throwable)} + |}""".stripMargin + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala new file mode 100644 index 0000000000..1837b92fa0 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorkSpace.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.application + +import io.kyligence.kap.engine.spark.job.ParamsConstants +import org.apache.commons.lang.StringUtils +import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.kylin.common.util.{JsonUtil, Unsafe} +import org.apache.kylin.engine.spark.application.SparkApplication +import org.apache.kylin.engine.spark.job.KylinBuildEnv +import org.apache.kylin.engine.spark.scheduler._ +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.KylinJobEventLoop + +import java.util +import java.util.concurrent.CountDownLatch + +/** + * Spark driver part, construct the real spark job [SparkApplication] + */ +object JobWorkSpace extends Logging { + def execute(args: Array[String]): Unit = { + try { + val (application, appArgs) = resolveArgs(args) + val eventLoop = new KylinJobEventLoop + val worker = new JobWorker(application, appArgs, eventLoop) + val monitor = new JobMonitor(eventLoop) + val workspace = new JobWorkSpace(eventLoop, monitor, worker) + val statusCode = workspace.run() + if (statusCode != 0) { + Unsafe.systemExit(statusCode) + } + } catch { + case throwable: Throwable => + logError("Error occurred when init job workspace.", throwable) + Unsafe.systemExit(1) + } + } + + def resolveArgs(args: Array[String]): (SparkApplication, Array[String]) = { + if (args.length < 2 || args(0) != "-className") throw new IllegalArgumentException("className is required") + val className = args(1) + // scalastyle:off + val o = Class.forName(className).newInstance + // scalastyle:on + if (!o.isInstanceOf[SparkApplication]) throw new IllegalArgumentException(className + " is not a subClass of AbstractApplication") + val appArgs = args.slice(2, args.length) + val application = o.asInstanceOf[SparkApplication] + (application, appArgs) + } +} + +class JobWorkSpace(eventLoop: KylinJobEventLoop, monitor: JobMonitor, worker: JobWorker) extends Logging { + require(eventLoop != null) + require(monitor != null) + require(worker != null) + + private var statusCode: Int = 0 + private val latch = new CountDownLatch(1) + + eventLoop.registerListener(new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + event match { + case _: JobSucceeded => success() + case jf: JobFailed => fail(jf) + case _ => + } + } + }) + + def run(): Int = { + eventLoop.start() + eventLoop.post(RunJob()) + latch.await() + statusCode + } + + def success(): Unit = { + try { + stop() + } finally { + statusCode = 0 + latch.countDown() + } + } + + def fail(jf: JobFailed): Unit = { + try { + logError(s"Job failed eventually. Reason: ${jf.reason}", jf.throwable) + KylinBuildEnv.get().buildJobInfos.recordJobRetryInfos(RetryInfo(new util.HashMap, jf.throwable)) + updateJobErrorInfo(jf) + stop() + } finally { + statusCode = 1 + latch.countDown() + } + } + + def stop(): Unit = { + monitor.stop() + worker.stop() + eventLoop.stop() + } + + def updateJobErrorInfo(jf: JobFailed): Unit = { + val infos = KylinBuildEnv.get().buildJobInfos + val context = worker.getApplication + + val project = context.getProject + val jobId = context.getJobId + + val stageId = infos.getStageId + val jobStepId = StringUtils.replace(infos.getJobStepId, SparkApplication.JOB_NAME_PREFIX, "") + val failedStepId = if (StringUtils.isBlank(stageId)) jobStepId else stageId + + val failedSegmentId = infos.getSegmentId + val failedStack = ExceptionUtils.getStackTrace(jf.throwable) + val failedReason = + if (context.getAtomicUnreachableSparkMaster.get()) "Unable connect spark master to reach timeout maximum time" + else jf.reason + val url = "/kylin/api/jobs/error" + + val payload: util.HashMap[String, Object] = new util.HashMap[String, Object](5) + payload.put("project", project) + payload.put("job_id", jobId) + payload.put("failed_step_id", failedStepId) + payload.put("failed_segment_id", failedSegmentId) + payload.put("failed_stack", failedStack) + payload.put("failed_reason", failedReason) + val json = JsonUtil.writeValueAsString(payload) + val params = new util.HashMap[String, String]() + val config = KylinBuildEnv.get().kylinConfig + params.put(ParamsConstants.TIME_OUT, config.getUpdateJobInfoTimeout.toString) + params.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true)) + context.getReport.updateSparkJobInfo(params, url, json); + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala new file mode 100644 index 0000000000..069ac171cc --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.application + +import java.util.concurrent.Executors + +import org.apache.kylin.engine.spark.application.SparkApplication +import org.apache.kylin.engine.spark.scheduler._ +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.KylinJobEventLoop + +class JobWorker(application: SparkApplication, args: Array[String], eventLoop: KylinJobEventLoop) extends Logging { + private val pool = Executors.newSingleThreadExecutor() + + def getApplication: SparkApplication = application + + eventLoop.registerListener(new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + event match { + case _: RunJob => runJob() + case _ => + } + } + }) + + def stop(): Unit = { + pool.shutdownNow() + application.logJobInfo() + } + + private def runJob(): Unit = { + execute() + } + + + private def execute(): Unit = { + pool.execute(new Runnable { + override def run(): Unit = { + try { + application.execute(args) + eventLoop.post(JobSucceeded()) + } catch { + case exception: NoRetryException => eventLoop.post(UnknownThrowable(exception)) + case throwable: Throwable => eventLoop.post(ResourceLack(throwable)) + } + } + }) + } +} + +class NoRetryException(msg: String) extends java.lang.Exception(msg) { + def this() { + this(null) + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala new file mode 100644 index 0000000000..5cefd24054 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/deploy/master/StandaloneClusterManager.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.util + +import org.apache.kylin.cluster.{AvailableResource, IClusterManager, ResourceInfo} +import org.apache.kylin.common.KylinConfig +import org.apache.spark.deploy.DeployMessages.{KillApplication, MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.master.StandaloneClusterManager.masterEndpoints +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcAddress, RpcEnv} +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.Utils +import org.apache.spark.{SecurityManager, SparkConf} + +// scalastyle:off +class StandaloneClusterManager extends IClusterManager with Logging { + + private val JOB_STEP_PREFIX = "job_step_" + + override def fetchMaximumResourceAllocation: ResourceInfo = { + val state = masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState) + val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE) + val availableMem = aliveWorkers.map(_.memoryFree).sum + val availableCores = aliveWorkers.map(_.coresFree).sum + logInfo(s"Get available resource, " + + s"availableMem: $availableMem, availableCores: $availableCores") + ResourceInfo(availableMem, availableCores) + } + + override def fetchQueueAvailableResource(queueName: String): AvailableResource = { + val state = masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState) + val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE) + val availableMem = aliveWorkers.map(_.memoryFree).sum + val availableCores = aliveWorkers.map(_.coresFree).sum + val totalMem = aliveWorkers.map(_.memory).sum + val totalCores = aliveWorkers.map(_.cores).sum + logInfo(s"Get available resource, " + + s"availableMem: $availableMem, availableCores: $availableCores, " + + s"totalMem: $totalMem, totalCores: $totalCores") + AvailableResource(ResourceInfo(availableMem, availableCores), ResourceInfo(totalMem, totalCores)) + } + + override def getBuildTrackingUrl(sparkSession: SparkSession): String = { + val applicationId = sparkSession.sparkContext.applicationId + logInfo(s"Get tracking url of application $applicationId") + val state = masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState) + val app = state.activeApps.find(_.id == applicationId).orNull + if (app == null) { + logInfo(s"No active application found of applicationId $applicationId") + return null + } + app.desc.appUiUrl + } + + override def killApplication(jobStepId: String): Unit = { + killApplication(s"$JOB_STEP_PREFIX", jobStepId) + } + + override def killApplication(jobStepPrefix: String, jobStepId: String): Unit = { + val master = masterEndpoints(0) + val state = master.askSync[MasterStateResponse](RequestMasterState) + val app = state.activeApps.find(_.desc.name.equals(jobStepPrefix + jobStepId)).orNull + if (app == null) { + logInfo(s"No active application found of jobStepId $jobStepId") + return + } + logInfo(s"Kill application ${app.id} by jobStepId $jobStepId") + master.send(KillApplication(app.id)) + } + + override def getRunningJobs(queues: util.Set[String]): util.List[String] = { + val state = masterEndpoints(0).askSync[MasterStateResponse](RequestMasterState) + val jobStepNames = state.activeApps.map(_.desc.name) + logInfo(s"Get running jobs ${jobStepNames.toSeq}") + import scala.collection.JavaConverters._ + jobStepNames.toList.asJava + } + + override def fetchQueueStatistics(queueName: String): ResourceInfo = { + fetchMaximumResourceAllocation + } + + override def isApplicationBeenKilled(jobStepId: String): Boolean = { + val master = masterEndpoints(0) + val state = master.askSync[MasterStateResponse](RequestMasterState) + val app = state.completedApps.find(_.desc.name.equals(s"$JOB_STEP_PREFIX$jobStepId")).orNull + if (app == null) { + false + } else { + "KILLED".equals(app.state.toString) + } + } + + override def applicationExisted(jobId: String): Boolean = { + val master = masterEndpoints(0) + val state = master.askSync[MasterStateResponse](RequestMasterState) + val app = state.activeApps.find(_.desc.name.equalsIgnoreCase(jobId)).orNull + if (app == null) { + false + } else { + true + } + } + +} + +object StandaloneClusterManager extends Logging { + + private val ENDPOINT_NAME = "Master" + private val CLIENT_NAME = "kylinStandaloneClient" + private val SPARK_LOCAL = "local" + private val SPARK_MASTER = "spark.master" + private val SPARK_RPC_TIMEOUT = "spark.rpc.askTimeout" + private val SPARK_AUTHENTICATE = "spark.authenticate" + private val SPARK_AUTHENTICATE_SECRET = "spark.authenticate.secret" + private val SPARK_NETWORK_CRYPTO_ENABLED = "spark.network.crypto.enabled" + + private lazy val masterEndpoints = { + val overrideConfig = KylinConfig.getInstanceFromEnv.getSparkConfigOverride + val conf = new SparkConf() + if (!conf.contains(SPARK_MASTER) || conf.get(SPARK_MASTER).startsWith(SPARK_LOCAL)) { + conf.set(SPARK_MASTER, overrideConfig.get(SPARK_MASTER)) + } + if (!conf.contains(SPARK_RPC_TIMEOUT)) { + conf.set(SPARK_RPC_TIMEOUT, "10s") + } + if (overrideConfig.containsKey(SPARK_AUTHENTICATE) && "true".equals(overrideConfig.get(SPARK_AUTHENTICATE))) { + conf.set(SPARK_AUTHENTICATE, "true") + conf.set(SPARK_AUTHENTICATE_SECRET, overrideConfig.getOrDefault(SPARK_AUTHENTICATE_SECRET, "kylin")) + conf.set(SPARK_NETWORK_CRYPTO_ENABLED, overrideConfig.getOrDefault(SPARK_NETWORK_CRYPTO_ENABLED, "true")) + } + logInfo(s"Spark master ${conf.get(SPARK_MASTER)}") + val rpcEnv = RpcEnv.create(CLIENT_NAME, Utils.localHostName(), 0, conf, new SecurityManager(conf)) + val masterUrls = conf.get(SPARK_MASTER) + Utils.parseStandaloneMasterUrls(masterUrls) + .map(RpcAddress.fromSparkURL) + .map(rpcEnv.setupEndpointRef(_, ENDPOINT_NAME)) + } +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala new file mode 100644 index 0000000000..38f1554a01 --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/scheduler/KylinJobEventLoop.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util +import java.util.{List => JList} + +import org.apache.kylin.engine.spark.scheduler.{KylinJobEvent, KylinJobListener} +import org.apache.spark.util.EventLoop + +import scala.collection.JavaConverters._ + +class KylinJobEventLoop extends EventLoop[KylinJobEvent]("spark-entry-event-loop") { + // register listener in single thread, so LinkedList is enough + private var listeners: JList[KylinJobListener] = new util.LinkedList[KylinJobListener]() + + def registerListener(listener: KylinJobListener): Boolean = { + listeners.add(listener) + } + + def unregisterListener(listener: KylinJobListener): Boolean = { + listeners.remove(listener) + } + + override protected def onReceive(event: KylinJobEvent): Unit = { + listeners.asScala.foreach(_.onReceive(event)) + } + + override protected def onError(e: Throwable): Unit = { + + } +}