This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new d39d1c9 [ZEPPELIN-5096] Error message for K8s launcher and Pending Status Improvement d39d1c9 is described below commit d39d1c9124c63be7afab871f9111f1428ac633a5 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Tue Oct 13 16:16:37 2020 +0200 [ZEPPELIN-5096] Error message for K8s launcher and Pending Status Improvement ### What is this PR for? This PR includes: - an error message from K8s launcher to help users understand why the Pod won't start - give the option to disable the timeout handling during the initial pod lifecycle phase `pending` ### What type of PR is it? - Improvement ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5096 ### How should this be tested? * Travis-CI: https://travis-ci.org/github/Reamer/zeppelin/builds/735737408 ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #3943 from Reamer/k8s_start and squashes the following commits: 2c8b4a2db [Philipp Dallig] Add option to disable timeout handling when starting interpreter pods a68ce2ec9 [Philipp Dallig] Print a nice ErrorMessage --- docs/setup/operation/configuration.md | 6 ++++ .../zeppelin/conf/ZeppelinConfiguration.java | 5 +++ .../launcher/K8sRemoteInterpreterProcess.java | 41 ++++++++++++++++++---- .../launcher/K8sStandardInterpreterLauncher.java | 5 +-- .../launcher/K8sRemoteInterpreterProcessTest.java | 13 +++++-- 5 files changed, 60 insertions(+), 10 deletions(-) diff --git a/docs/setup/operation/configuration.md b/docs/setup/operation/configuration.md index 9c6650b..19cfe12 100644 --- a/docs/setup/operation/configuration.md +++ b/docs/setup/operation/configuration.md @@ -431,6 +431,12 @@ If both are defined, then the **environment variables** will take priority. <td>zeppelin-server</td> <td>Name of the Zeppelin server service resources</td> </tr> + <tr> + <td><h6 class="properties">ZEPPELIN_K8S_TIMEOUT_DURING_PENDING</h6></td> + <td><h6 class="properties">zeppelin.k8s.timeout.during.pending</h6></td> + <td>true</td> + <td>Value to enable/disable timeout handling when starting Interpreter Pods. Caution: This can lead to an infinity loop</td> + </tr> </table> diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 7e5f006..cf88e1e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -872,6 +872,10 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getString(ConfVars.ZEPPELIN_K8S_SERVICE_NAME); } + public boolean getK8sTimeoutDuringPending() { + return getBoolean(ConfVars.ZEPPELIN_K8S_TIMEOUT_DURING_PENDING); + } + public String getDockerContainerImage() { return getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_IMAGE); } @@ -1082,6 +1086,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE("zeppelin.k8s.spark.container.image", "apache/spark:latest"), ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"), ZEPPELIN_K8S_SERVICE_NAME("zeppelin.k8s.service.name", "zeppelin-server"), + ZEPPELIN_K8S_TIMEOUT_DURING_PENDING("zeppelin.k8s.timeout.during.pending", true), ZEPPELIN_DOCKER_CONTAINER_IMAGE("zeppelin.docker.container.image", "apache/zeppelin:" + Util.getVersion()), diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index 989bced..43358a4 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -48,8 +48,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { private final String sparkImage; private LocalPortForward localPortForward; private int podPort = K8S_INTERPRETER_SERVICE_PORT; + private String errorMessage; private final boolean isUserImpersonatedForSpark; + private final boolean timeoutDuringPending; private AtomicBoolean started = new AtomicBoolean(false); private Random rand = new Random(); @@ -76,7 +78,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { String sparkImage, int connectTimeout, int connectionPoolSize, - boolean isUserImpersonatedForSpark + boolean isUserImpersonatedForSpark, + boolean timeoutDuringPending ) { super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort); this.client = client; @@ -92,6 +95,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { this.sparkImage = sparkImage; this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6); this.isUserImpersonatedForSpark = isUserImpersonatedForSpark; + this.timeoutDuringPending = timeoutDuringPending; } @@ -133,21 +137,37 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { localPortForward = client.pods().inNamespace(namespace).withName(podName).portForward(K8S_INTERPRETER_SERVICE_PORT, podPort); } + // special handling if we doesn't want timeout the process during lifecycle phase pending + if (!timeoutDuringPending) { + while ("pending".equalsIgnoreCase(getPodPhase()) && !Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOGGER.error("Interrupt received during pending phase. Try to stop the interpreter and interrupt the current thread.", e); + errorMessage = "Start process was interrupted during the pending phase"; + stop(); + Thread.currentThread().interrupt(); + } + } + } + long startTime = System.currentTimeMillis(); long timeoutTime = startTime + getConnectTimeout(); // wait until interpreter send started message through thrift rpc synchronized (started) { - while (!started.get()) { + while (!started.get() && !Thread.currentThread().isInterrupted()) { long timetoTimeout = timeoutTime - System.currentTimeMillis(); if (timetoTimeout <= 0) { + errorMessage = "The start process was aborted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase(); stop(); throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now"); } try { started.wait(timetoTimeout); } catch (InterruptedException e) { - LOGGER.error("Interrupt received. Try to stop the interpreter and interrupt the current thread.", e); + LOGGER.error("Interrupt received during started wait. Try to stop the interpreter and interrupt the current thread.", e); + errorMessage = "The start process was interrupted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase(); stop(); Thread.currentThread().interrupt(); } @@ -155,15 +175,17 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { } // waits for interpreter thrift rpc server ready - while (!RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { + while (!RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()) && !Thread.currentThread().isInterrupted()) { if (System.currentTimeMillis() - timeoutTime > 0) { + errorMessage = "The start process was aborted while waiting for the accessibility check of the remote end point. PodPhase before stop: " + getPodPhase(); stop(); throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now"); } try { Thread.sleep(1000); } catch (InterruptedException e) { - LOGGER.error("Interrupt received. Try to stop the interpreter and interrupt the current thread.", e); + LOGGER.error("Interrupt received during remote endpoint accessible check. Try to stop the interpreter and interrupt the current thread.", e); + errorMessage = "The start process was interrupted while waiting for the accessibility check of the remote end point. PodPhase before stop: " + getPodPhase(); stop(); Thread.currentThread().interrupt(); } @@ -223,6 +245,13 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { return false; } + public String getPodPhase() { + Pod pod = client.pods().inNamespace(namespace).withName(podName).get(); + if (pod != null) { + return pod.getStatus().getPhase(); + } + return "Unknown"; + } /** * Apply spec file(s) in the path. * @param path @@ -438,6 +467,6 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { @Override public String getErrorMessage() { - return null; + return String.format("%s%ncurrent PodPhase: %s", errorMessage, getPodPhase()); } } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java index 6eeb915..fcfe9ec 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java @@ -153,7 +153,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { zConf.getK8sSparkContainerImage(), getConnectTimeout(), getConnectPoolSize(), - isUserImpersonateForSparkInterpreter(context)); + isUserImpersonateForSparkInterpreter(context), + zConf.getK8sTimeoutDuringPending()); } protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { @@ -173,7 +174,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { return env; } - String readFile(String path, Charset encoding) throws IOException { + private String readFile(String path, Charset encoding) throws IOException { byte[] encoded = Files.readAllBytes(Paths.get(path)); return new String(encoded, encoding); } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java index b85ade0..95cd604 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -58,6 +58,7 @@ public class K8sRemoteInterpreterProcessTest { "spark-container:1.0", 10, 10, + false, false); // then @@ -87,6 +88,7 @@ public class K8sRemoteInterpreterProcessTest { "spark-container:1.0", 10, 10, + false, false); @@ -121,6 +123,7 @@ public class K8sRemoteInterpreterProcessTest { "spark-container:1.0", 10, 10, + false, false); // when @@ -173,6 +176,7 @@ public class K8sRemoteInterpreterProcessTest { "spark-container:1.0", 10, 10, + false, false); // when @@ -224,7 +228,8 @@ public class K8sRemoteInterpreterProcessTest { "spark-container:1.0", 10, 10, - true); + true, + false); // when Properties p = intp.getTemplateBindings("mytestUser"); @@ -274,7 +279,8 @@ public class K8sRemoteInterpreterProcessTest { "spark-container:1.0", 10, 10, - true); + true, + false); // when Properties p = intp.getTemplateBindings("anonymous"); @@ -313,6 +319,7 @@ public class K8sRemoteInterpreterProcessTest { "spark-container:1.0", 10, 10, + false, false); // when non template url @@ -357,6 +364,7 @@ public class K8sRemoteInterpreterProcessTest { "spark-container:1.0", 10, 10, + false, false); // when @@ -393,6 +401,7 @@ public class K8sRemoteInterpreterProcessTest { "spark-container:1.0", 10, 10, + false, false); // when