This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new be0aedd KYLIN-4449 The spark process is not actually killed after the job be paused be0aedd is described below commit be0aedd6a2a1cf23e4ff50597a264857d2dff22b Author: yaqian.zhang <598593...@qq.com> AuthorDate: Tue Sep 1 14:55:44 2020 +0800 KYLIN-4449 The spark process is not actually killed after the job be paused --- build/bin/kill-process-tree.sh | 56 ++++++++++++++++++++++ .../org/apache/kylin/common/JobProcessContext.java | 10 ++++ .../kylin/job/execution/AbstractExecutable.java | 3 +- .../kylin/job/execution/ExecutableManager.java | 38 ++++++++++++--- 4 files changed, 100 insertions(+), 7 deletions(-) diff --git a/build/bin/kill-process-tree.sh b/build/bin/kill-process-tree.sh new file mode 100644 index 0000000..a79ead8 --- /dev/null +++ b/build/bin/kill-process-tree.sh @@ -0,0 +1,56 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +function help_func { + echo "Usage: kill-process-tree.sh <PID>(process id)" + echo " kill-process-tree.sh 12345" + exit 1 +} + +function isRunning() { + [[ -n "$(ps -p $1 -o pid=)" ]] +} + +function killTree() { + local parent=$1 child + for child in $(ps ax -o ppid= -o pid= | awk "\$1==$parent {print \$2}"); do + killTree ${child} + done + kill ${parent} + if isRunning ${parent}; then + sleep 5 + if isRunning ${parent}; then + kill -9 ${parent} + fi + fi +} + +# Check parameters count. +if [[ $# -ne 1 ]]; then + help_func +fi + +# Check whether it contains non-digit characters. +# Remove all digit characters and check for length. +# If there's length it's not a number. +if [[ -n ${1//[0-9]/} ]]; then + help_func +fi + +killTree $@ \ No newline at end of file diff --git a/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java b/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java index 4166ce0..cb5ac47 100644 --- a/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/JobProcessContext.java @@ -18,8 +18,10 @@ package org.apache.kylin.common; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import java.lang.reflect.Field; import java.util.Map; public class JobProcessContext { @@ -37,4 +39,12 @@ public class JobProcessContext { public static void removeProcess(String jobId){ runningProcess.remove(jobId); } + + public static int getPid(Process process) throws IllegalAccessException, NoSuchFieldException { + String className = process.getClass().getName(); + Preconditions.checkState(className.equals("java.lang.UNIXProcess")); + Field f = process.getClass().getDeclaredField("pid"); + f.setAccessible(true); + return f.getInt(process); + } } \ No newline at end of file diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 7628975..29d9b5a 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -196,8 +196,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent { try { result = doWork(executableContext); } catch (JobStoppedException e) { - //The job be paused, ignore it + logger.debug("The job be paused, ignore it: {}", this.toString()); } catch (Throwable e) { + logger.error("error running Executable: {}", this.toString()); catchedException = e; } finally { cleanup(result); diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 6b1242b..8772d32 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -24,11 +24,14 @@ import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL; import static org.apache.kylin.job.constant.ExecutableConstants.FLINK_JOB_ID; import java.io.IOException; +import java.nio.file.Paths; import java.util.HashMap; import java.util.IllegalFormatException; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.JobProcessContext; @@ -53,6 +56,9 @@ public class ExecutableManager { private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class); + private static final int CMD_EXEC_TIMEOUT_SEC = 60; + private static final String KILL_PROCESS_TREE = "kill-process-tree.sh"; + public static ExecutableManager getInstance(KylinConfig config) { return config.getManager(ExecutableManager.class); } @@ -499,12 +505,32 @@ public class ExecutableManager { } public void destroyProcess(String jobId) { - // in ut env, there is no process for job, just do nothing - if (!config.isUTEnv()) { - Process process = JobProcessContext.getProcess(jobId); - if (process != null && process.isAlive()) { - logger.info("Will destroy process " + process.toString()); - process.destroyForcibly(); + Process originProc = JobProcessContext.getProcess(jobId); + if (Objects.nonNull(originProc) && originProc.isAlive()) { + try { + final int ppid = JobProcessContext.getPid(originProc); + logger.info("start to destroy process {} of job {}", ppid, jobId); + //build cmd template + StringBuilder cmdBuilder = new StringBuilder("bash "); + cmdBuilder.append(Paths.get(KylinConfig.getKylinHome(), "bin", KILL_PROCESS_TREE)); + cmdBuilder.append(" "); + cmdBuilder.append(ppid); + final String killCmd = cmdBuilder.toString(); + Process killProc = Runtime.getRuntime().exec(killCmd); + if (killProc.waitFor(CMD_EXEC_TIMEOUT_SEC, TimeUnit.SECONDS)) { + logger.info("try to destroy process {} of job {}, exec cmd '{}', exitValue : {}", ppid, jobId, + killCmd, killProc.exitValue()); + if (!originProc.isAlive()) { + logger.info("destroy process {} of job {} SUCCEED.", ppid, jobId); + return; + } + logger.info("destroy process {} of job {} FAILED.", ppid, jobId); + } + + //generally, code executing wouldn't reach here + logger.warn("destroy process {} of job {} TIMEOUT exceed {}s.", ppid, jobId, CMD_EXEC_TIMEOUT_SEC); + } catch (Exception e) { + logger.error("destroy process of job {} FAILED.", jobId, e); } } }