This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new d39d1c9  [ZEPPELIN-5096] Error message for K8s launcher and Pending 
Status Improvement
d39d1c9 is described below

commit d39d1c9124c63be7afab871f9111f1428ac633a5
Author: Philipp Dallig <philipp.dal...@gmail.com>
AuthorDate: Tue Oct 13 16:16:37 2020 +0200

    [ZEPPELIN-5096] Error message for K8s launcher and Pending Status 
Improvement
    
    ### What is this PR for?
    This PR includes:
     - an error message from K8s launcher to help users understand why the Pod 
won't start
     - give the option to disable the timeout handling during the initial pod 
lifecycle phase `pending`
    
    ### What type of PR is it?
     - Improvement
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5096
    
    ### How should this be tested?
    * Travis-CI: https://travis-ci.org/github/Reamer/zeppelin/builds/735737408
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <philipp.dal...@gmail.com>
    
    Closes #3943 from Reamer/k8s_start and squashes the following commits:
    
    2c8b4a2db [Philipp Dallig] Add option to disable timeout handling when 
starting interpreter pods
    a68ce2ec9 [Philipp Dallig] Print a nice ErrorMessage
---
 docs/setup/operation/configuration.md              |  6 ++++
 .../zeppelin/conf/ZeppelinConfiguration.java       |  5 +++
 .../launcher/K8sRemoteInterpreterProcess.java      | 41 ++++++++++++++++++----
 .../launcher/K8sStandardInterpreterLauncher.java   |  5 +--
 .../launcher/K8sRemoteInterpreterProcessTest.java  | 13 +++++--
 5 files changed, 60 insertions(+), 10 deletions(-)

diff --git a/docs/setup/operation/configuration.md 
b/docs/setup/operation/configuration.md
index 9c6650b..19cfe12 100644
--- a/docs/setup/operation/configuration.md
+++ b/docs/setup/operation/configuration.md
@@ -431,6 +431,12 @@ If both are defined, then the **environment variables** 
will take priority.
     <td>zeppelin-server</td>
     <td>Name of the Zeppelin server service resources</td>
   </tr>
+  <tr>
+    <td><h6 class="properties">ZEPPELIN_K8S_TIMEOUT_DURING_PENDING</h6></td>
+    <td><h6 class="properties">zeppelin.k8s.timeout.during.pending</h6></td>
+    <td>true</td>
+    <td>Value to enable/disable timeout handling when starting Interpreter 
Pods. Caution: This can lead to an infinity loop</td>
+  </tr>
 </table>
 
 
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 7e5f006..cf88e1e 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -872,6 +872,10 @@ public class ZeppelinConfiguration extends 
XMLConfiguration {
     return getString(ConfVars.ZEPPELIN_K8S_SERVICE_NAME);
   }
 
+  public boolean getK8sTimeoutDuringPending() {
+    return getBoolean(ConfVars.ZEPPELIN_K8S_TIMEOUT_DURING_PENDING);
+  }
+
   public String getDockerContainerImage() {
     return getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_IMAGE);
   }
@@ -1082,6 +1086,7 @@ public class ZeppelinConfiguration extends 
XMLConfiguration {
     ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE("zeppelin.k8s.spark.container.image", 
"apache/spark:latest"),
     ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"),
     ZEPPELIN_K8S_SERVICE_NAME("zeppelin.k8s.service.name", "zeppelin-server"),
+    ZEPPELIN_K8S_TIMEOUT_DURING_PENDING("zeppelin.k8s.timeout.during.pending", 
true),
 
     ZEPPELIN_DOCKER_CONTAINER_IMAGE("zeppelin.docker.container.image", 
"apache/zeppelin:" + Util.getVersion()),
 
diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 989bced..43358a4 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -48,8 +48,10 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterProcess {
   private final String sparkImage;
   private LocalPortForward localPortForward;
   private int podPort = K8S_INTERPRETER_SERVICE_PORT;
+  private String errorMessage;
 
   private final boolean isUserImpersonatedForSpark;
+  private final boolean timeoutDuringPending;
 
   private AtomicBoolean started = new AtomicBoolean(false);
   private Random rand = new Random();
@@ -76,7 +78,8 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterProcess {
           String sparkImage,
           int connectTimeout,
           int connectionPoolSize,
-          boolean isUserImpersonatedForSpark
+          boolean isUserImpersonatedForSpark,
+          boolean timeoutDuringPending
   ) {
     super(connectTimeout, connectionPoolSize, intpEventServerHost, 
intpEventServerPort);
     this.client = client;
@@ -92,6 +95,7 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterProcess {
     this.sparkImage = sparkImage;
     this.podName = interpreterGroupName.toLowerCase() + "-" + 
getRandomString(6);
     this.isUserImpersonatedForSpark = isUserImpersonatedForSpark;
+    this.timeoutDuringPending = timeoutDuringPending;
   }
 
 
@@ -133,21 +137,37 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterProcess {
       localPortForward = 
client.pods().inNamespace(namespace).withName(podName).portForward(K8S_INTERPRETER_SERVICE_PORT,
 podPort);
     }
 
+    // special handling if we doesn't want timeout the process during 
lifecycle phase pending
+    if (!timeoutDuringPending) {
+      while ("pending".equalsIgnoreCase(getPodPhase()) && 
!Thread.currentThread().isInterrupted()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          LOGGER.error("Interrupt received during pending phase. Try to stop 
the interpreter and interrupt the current thread.", e);
+          errorMessage = "Start process was interrupted during the pending 
phase";
+          stop();
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
     long startTime = System.currentTimeMillis();
     long timeoutTime = startTime + getConnectTimeout();
 
     // wait until interpreter send started message through thrift rpc
     synchronized (started) {
-      while (!started.get()) {
+      while (!started.get() && !Thread.currentThread().isInterrupted()) {
         long timetoTimeout = timeoutTime - System.currentTimeMillis();
         if (timetoTimeout <= 0) {
+          errorMessage = "The start process was aborted while waiting for the 
interpreter to start. PodPhase before stop: " + getPodPhase();
           stop();
           throw new IOException("Launching zeppelin interpreter on kubernetes 
is time out, kill it now");
         }
         try {
           started.wait(timetoTimeout);
         } catch (InterruptedException e) {
-          LOGGER.error("Interrupt received. Try to stop the interpreter and 
interrupt the current thread.", e);
+          LOGGER.error("Interrupt received during started wait. Try to stop 
the interpreter and interrupt the current thread.", e);
+          errorMessage = "The start process was interrupted while waiting for 
the interpreter to start. PodPhase before stop: " + getPodPhase();
           stop();
           Thread.currentThread().interrupt();
         }
@@ -155,15 +175,17 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterProcess {
     }
 
     // waits for interpreter thrift rpc server ready
-    while (!RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), 
getPort())) {
+    while (!RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), 
getPort()) && !Thread.currentThread().isInterrupted()) {
       if (System.currentTimeMillis() - timeoutTime > 0) {
+        errorMessage = "The start process was aborted while waiting for the 
accessibility check of the remote end point. PodPhase before stop: " + 
getPodPhase();
         stop();
         throw new IOException("Launching zeppelin interpreter on kubernetes is 
time out, kill it now");
       }
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
-        LOGGER.error("Interrupt received. Try to stop the interpreter and 
interrupt the current thread.", e);
+        LOGGER.error("Interrupt received during remote endpoint accessible 
check. Try to stop the interpreter and interrupt the current thread.", e);
+        errorMessage = "The start process was interrupted while waiting for 
the accessibility check of the remote end point. PodPhase before stop: " + 
getPodPhase();
         stop();
         Thread.currentThread().interrupt();
       }
@@ -223,6 +245,13 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterProcess {
     return false;
   }
 
+  public String getPodPhase() {
+    Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
+    if (pod != null) {
+      return pod.getStatus().getPhase();
+    }
+    return "Unknown";
+  }
   /**
    * Apply spec file(s) in the path.
    * @param path
@@ -438,6 +467,6 @@ public class K8sRemoteInterpreterProcess extends 
RemoteInterpreterProcess {
 
   @Override
   public String getErrorMessage() {
-    return null;
+    return String.format("%s%ncurrent PodPhase: %s", errorMessage, 
getPodPhase());
   }
 }
diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index 6eeb915..fcfe9ec 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -153,7 +153,8 @@ public class K8sStandardInterpreterLauncher extends 
InterpreterLauncher {
             zConf.getK8sSparkContainerImage(),
             getConnectTimeout(),
             getConnectPoolSize(),
-            isUserImpersonateForSparkInterpreter(context));
+            isUserImpersonateForSparkInterpreter(context),
+            zConf.getK8sTimeoutDuringPending());
   }
 
   protected Map<String, String> 
buildEnvFromProperties(InterpreterLaunchContext context) {
@@ -173,7 +174,7 @@ public class K8sStandardInterpreterLauncher extends 
InterpreterLauncher {
     return env;
   }
 
-  String readFile(String path, Charset encoding) throws IOException {
+  private String readFile(String path, Charset encoding) throws IOException {
     byte[] encoded = Files.readAllBytes(Paths.get(path));
     return new String(encoded, encoding);
   }
diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index b85ade0..95cd604 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -58,6 +58,7 @@ public class K8sRemoteInterpreterProcessTest {
         "spark-container:1.0",
         10,
         10,
+        false,
         false);
 
     // then
@@ -87,6 +88,7 @@ public class K8sRemoteInterpreterProcessTest {
         "spark-container:1.0",
         10,
         10,
+        false,
         false);
 
 
@@ -121,6 +123,7 @@ public class K8sRemoteInterpreterProcessTest {
         "spark-container:1.0",
         10,
         10,
+        false,
         false);
 
     // when
@@ -173,6 +176,7 @@ public class K8sRemoteInterpreterProcessTest {
         "spark-container:1.0",
         10,
         10,
+        false,
         false);
 
     // when
@@ -224,7 +228,8 @@ public class K8sRemoteInterpreterProcessTest {
         "spark-container:1.0",
         10,
         10,
-        true);
+        true,
+        false);
 
     // when
     Properties p = intp.getTemplateBindings("mytestUser");
@@ -274,7 +279,8 @@ public class K8sRemoteInterpreterProcessTest {
         "spark-container:1.0",
         10,
         10,
-        true);
+        true,
+        false);
 
     // when
     Properties p = intp.getTemplateBindings("anonymous");
@@ -313,6 +319,7 @@ public class K8sRemoteInterpreterProcessTest {
         "spark-container:1.0",
         10,
         10,
+        false,
         false);
 
     // when non template url
@@ -357,6 +364,7 @@ public class K8sRemoteInterpreterProcessTest {
         "spark-container:1.0",
         10,
         10,
+        false,
         false);
 
     // when
@@ -393,6 +401,7 @@ public class K8sRemoteInterpreterProcessTest {
         "spark-container:1.0",
         10,
         10,
+        false,
         false);
 
     // when

Reply via email to