This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new ceb24b8  [ZEPPELIN-4662]. Spark driver OOM cause Connection refused 
error in frontend
ceb24b8 is described below

commit ceb24b84caf9f4468c45b06ae6d26bec0682a839
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Sun Mar 1 23:17:03 2020 +0800

    [ZEPPELIN-4662]. Spark driver OOM cause Connection refused error in frontend
    
    ### What is this PR for?
    This PR is an improvement PR which to display more meaningful error in 
frontend when spark driver OOM in yarn mode. Currently zeppelin would display 
connection refused in frontend if spark driver happen OOM. This PR would 
display the real yarn diagnostic info in frontend.
    In this PR, I will launch a yarn app monitoring thread to monitor the all 
the yarn app status. And would capture the yarn diagnostics if it failed.
    
    ### What type of PR is it?
    [ Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4662
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
![image](https://user-images.githubusercontent.com/164491/75843520-4d709b80-5e0e-11ea-98d7-56ee394692f2.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3674 from zjffdu/ZEPPELIN-4662 and squashes the following commits:
    
    b6d5e0b16 [Jeff Zhang] [ZEPPELIN-4662]. Spark driver OOM cause Connection 
refused error in frontend
---
 conf/zeppelin-site.xml.template                    |   6 ++
 .../zeppelin/conf/ZeppelinConfiguration.java       |   3 +
 .../zeppelin/interpreter/util/ProcessLauncher.java |   4 +
 .../zeppelin/interpreter/YarnAppMonitor.java       | 111 +++++++++++++++++++++
 .../interpreter/remote/RemoteInterpreter.java      |   6 +-
 .../remote/RemoteInterpreterManagedProcess.java    |  34 ++++++-
 .../interpreter/remote/RemoteInterpreterTest.java  |  46 +++------
 7 files changed, 172 insertions(+), 38 deletions(-)

diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 1bdc65f..b7c3728 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -456,6 +456,12 @@
   <description>Enable directory listings on server.</description>
 </property>
 
+<property>
+  <name>zeppelin.interpreter.yarn.monitor.interval_secs</name>
+  <value>10</value>
+  <description>Check interval in secs for yarn apps monitors</description>
+</property>
+
 <!--
 <property>
   <name>zeppelin.interpreter.lifecyclemanager.class</name>
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 43f9ec7..2319606 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -935,6 +935,9 @@ public class ZeppelinConfiguration extends XMLConfiguration 
{
     ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD(
         "zeppelin.interpreter.lifecyclemanager.timeout.threshold", 3600000L),
 
+    ZEPPELIN_INTERPRETER_YARN_MONITOR_INTERVAL_SECS(
+            "zeppelin.interpreter.yarn.monitor.interval_secs", 10),
+
     
ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE("zeppelin.scheduler.threadpool.size", 
100),
 
     ZEPPELIN_OWNER_ROLE("zeppelin.notebook.default.owner.username", ""),
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
index 85c1cfe..b45db06 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
@@ -144,6 +144,10 @@ public abstract class ProcessLauncher implements 
ExecuteResultHandler {
     }
   }
 
+  public String getProcessLaunchOutput() {
+    return this.processOutput.getProcessExecutionOutput();
+  }
+
   public boolean isLaunchTimeout() {
     return launchTimeout;
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
new file mode 100644
index 0000000..8f4980b
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/YarnAppMonitor.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.scheduler.SchedulerThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class will launch a thread to check yarn app status regularly.
+ */
+public class YarnAppMonitor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YarnAppMonitor.class);
+  private static YarnAppMonitor instance;
+
+  private ZeppelinConfiguration conf;
+  private ScheduledExecutorService executor;
+  private YarnClient yarnClient;
+  private ConcurrentHashMap<ApplicationId, RemoteInterpreterManagedProcess> 
apps;
+
+  public static synchronized YarnAppMonitor get() {
+    if (instance == null) {
+      instance = new YarnAppMonitor();
+    }
+    return instance;
+  }
+
+  private YarnAppMonitor() {
+    try {
+      this.conf = ZeppelinConfiguration.create();
+      this.yarnClient = YarnClient.createYarnClient();
+      YarnConfiguration yarnConf = new YarnConfiguration();
+      // disable timeline service as we only query yarn app here.
+      // Otherwise we may hit this kind of ERROR:
+      // java.lang.ClassNotFoundException: 
com.sun.jersey.api.client.config.ClientConfig
+      yarnConf.set("yarn.timeline-service.enabled", "false");
+      yarnClient.init(yarnConf);
+      yarnClient.start();
+      this.executor = Executors.newSingleThreadScheduledExecutor(new 
SchedulerThreadFactory("YarnAppsMonitor-Thread"));
+      this.apps = new ConcurrentHashMap<>();
+      this.executor.scheduleAtFixedRate(() -> {
+                try {
+                  Iterator<Map.Entry<ApplicationId, 
RemoteInterpreterManagedProcess>> iter = apps.entrySet().iterator();
+                  while (iter.hasNext()) {
+                    Map.Entry<ApplicationId, RemoteInterpreterManagedProcess> 
entry = iter.next();
+                    ApplicationId appId = entry.getKey();
+                    RemoteInterpreterManagedProcess interpreterManagedProcess 
= entry.getValue();
+                    ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
+                    if (appReport.getYarnApplicationState() == 
YarnApplicationState.FAILED ||
+                            appReport.getYarnApplicationState() == 
YarnApplicationState.KILLED) {
+                      String yarnDiagnostics = appReport.getDiagnostics();
+                      interpreterManagedProcess.processStopped("Yarn 
diagnostics: " + yarnDiagnostics);
+                      iter.remove();
+                      LOGGER.info("Remove " + appId + " from YarnAppMonitor, 
because its state is " +
+                              appReport.getYarnApplicationState());
+                    } else if (appReport.getYarnApplicationState() == 
YarnApplicationState.FINISHED) {
+                      iter.remove();
+                      LOGGER.info("Remove " + appId + " from YarnAppMonitor, 
because its state is " +
+                              appReport.getYarnApplicationState());
+                    }
+                  }
+                } catch (Exception e) {
+                  LOGGER.warn("Fail to check yarn app status", e);
+                }
+              },
+              conf.getInt("zeppelin.interpreter.yarn.monitor.interval_secs", 
10),
+              conf.getInt("zeppelin.interpreter.yarn.monitor.interval_secs", 
10),
+              TimeUnit.SECONDS);
+
+      LOGGER.info("YarnAppMonitor is started");
+    } catch (Throwable e) {
+      LOGGER.warn("Fail to initialize YarnAppMonitor", e);
+    }
+  }
+
+  public void addYarnApp(ApplicationId appId, RemoteInterpreterManagedProcess 
interpreterManagedProcess) {
+    LOGGER.info("Add " + appId + " to YarnAppMonitor");
+    this.apps.put(appId, interpreterManagedProcess);
+  }
+}
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index c9b1256..692224b 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -162,7 +162,7 @@ public class RemoteInterpreter extends Interpreter {
       if (!isCreated) {
         this.interpreterProcess = getOrCreateInterpreterProcess();
         if (!interpreterProcess.isRunning()) {
-          throw new IOException("Interpreter process is not running:\n" +
+          throw new IOException("Interpreter process is not running\n" +
                   interpreterProcess.getErrorMessage());
         }
         interpreterProcess.callRemoteFunction(new 
RemoteInterpreterProcess.RemoteFunction<Void>() {
@@ -218,8 +218,8 @@ public class RemoteInterpreter extends Interpreter {
       throw new InterpreterException(e);
     }
     if (!interpreterProcess.isRunning()) {
-      throw new InterpreterException("Interpreter process is not running:\n" +
-              interpreterProcess.getErrorMessage());
+      return new InterpreterResult(InterpreterResult.Code.ERROR,
+              "Interpreter process is not running\n" + 
interpreterProcess.getErrorMessage());
     }
     this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), 
sessionId);
     return interpreterProcess.callRemoteFunction(
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 3c0f1e4..25255a4 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -21,6 +21,9 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.ExecuteException;
 import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.zeppelin.interpreter.YarnAppMonitor;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.interpreter.util.ProcessLauncher;
 import org.slf4j.Logger;
@@ -28,6 +31,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * This class manages start / stop of remote interpreter process
@@ -35,6 +40,8 @@ import java.util.Map;
 public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
   private static final Logger LOGGER = LoggerFactory.getLogger(
       RemoteInterpreterManagedProcess.class);
+  private static final Pattern YARN_APP_PATTER =
+          Pattern.compile("Submitted application (\\w+)");
 
   private final String interpreterRunner;
   private final int zeppelinServerRPCPort;
@@ -48,6 +55,7 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
   private final String interpreterSettingName;
   private final String interpreterGroupId;
   private final boolean isUserImpersonated;
+  private String errorMessage;
 
   private Map<String, String> env;
 
@@ -120,9 +128,18 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
               "setting zeppelin.interpreter.connect.timeout of this 
interpreter.\n" +
               interpreterProcessLauncher.getErrorMessage());
     }
+
     if (!interpreterProcessLauncher.isRunning()) {
       throw new IOException("Fail to launch interpreter process:\n" +
               interpreterProcessLauncher.getErrorMessage());
+    } else {
+      String launchOutput = 
interpreterProcessLauncher.getProcessLaunchOutput();
+      Matcher m = YARN_APP_PATTER.matcher(launchOutput);
+      if (m.find()) {
+        String appId = m.group(1);
+        LOGGER.info("Detected yarn app: " + appId + ", add it to 
YarnAppMonitor");
+        YarnAppMonitor.get().addYarnApp(ConverterUtils.toApplicationId(appId), 
this);
+      }
     }
   }
 
@@ -143,12 +160,9 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
 
       // Shutdown connection
       shutdown();
-
       this.interpreterProcessLauncher.stop();
     }
 
-
-
     interpreterProcessLauncher = null;
     LOGGER.info("Remote process terminated");
   }
@@ -157,9 +171,16 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
   public void processStarted(int port, String host) {
     this.port = port;
     this.host = host;
+    // for yarn cluster it may be transitioned from COMPLETED to RUNNING.
     interpreterProcessLauncher.onProcessRunning();
   }
 
+  // called when remote interpreter process is stopped, e.g. YarnAppsMonitor 
will call this
+  // after detecting yarn app is killed/failed.
+  public void processStopped(String errorMessage) {
+    this.errorMessage = errorMessage;
+  }
+
   @VisibleForTesting
   public Map<String, String> getEnv() {
     return env;
@@ -194,12 +215,15 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess {
   }
 
   public boolean isRunning() {
-    return interpreterProcessLauncher != null && 
interpreterProcessLauncher.isRunning();
+    return interpreterProcessLauncher != null && 
interpreterProcessLauncher.isRunning()
+            && errorMessage == null;
   }
 
   @Override
   public String getErrorMessage() {
-    return this.interpreterProcessLauncher != null ? 
this.interpreterProcessLauncher.getErrorMessage() : "";
+    String interpreterProcessError = this.interpreterProcessLauncher != null
+            ? this.interpreterProcessLauncher.getErrorMessage() : "";
+    return errorMessage != null ? errorMessage : interpreterProcessError;
   }
 
   private class InterpreterProcessLauncher extends ProcessLauncher {
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 70a4db7..9beaaeb 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -93,19 +93,10 @@ public class RemoteInterpreterTest extends 
AbstractInterpreterTest {
     // RemoteInterpreterProcess leakage.
     
remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId());
     
assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess());
-    try {
-      assertEquals("hello", remoteInterpreter1.interpret("hello", 
context1).message().get(0).getData());
-      fail("Should not be able to call interpret after interpreter is closed");
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
 
-    try {
-      assertEquals("hello", remoteInterpreter2.interpret("hello", 
context1).message().get(0).getData());
-      fail("Should not be able to call getProgress after 
RemoterInterpreterProcess is stoped");
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+    InterpreterResult result = remoteInterpreter1.interpret("hello", context1);
+    assertEquals(Code.ERROR, result.code());
+    assertEquals("Interpreter process is not running\n", 
result.message().get(0).getData());
   }
 
   @Test
@@ -145,12 +136,10 @@ public class RemoteInterpreterTest extends 
AbstractInterpreterTest {
     
assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
     assertEquals("hello", remoteInterpreter2.interpret("hello", 
context1).message().get(0).getData());
     
remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId());
-    try {
-      assertEquals("hello", remoteInterpreter2.interpret("hello", context1));
-      fail("Should not be able to call interpret after interpreter is closed");
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+
+    InterpreterResult result = remoteInterpreter2.interpret("hello", context1);
+    assertEquals(Code.ERROR, result.code());
+    assertEquals("Interpreter process is not running\n", 
result.message().get(0).getData());
     
assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
   }
 
@@ -182,21 +171,18 @@ public class RemoteInterpreterTest extends 
AbstractInterpreterTest {
     
remoteInterpreter1.getInterpreterGroup().close(remoteInterpreter1.getSessionId());
     
assertNull(remoteInterpreter1.getInterpreterGroup().getRemoteInterpreterProcess());
     
assertTrue(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess().isRunning());
-    try {
-      remoteInterpreter1.interpret("hello", context1);
-      fail("Should not be able to call getProgress after interpreter is 
closed");
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+
+    InterpreterResult result = remoteInterpreter1.interpret("hello", context1);
+    assertEquals(Code.ERROR, result.code());
+    assertEquals("Interpreter process is not running\n", 
result.message().get(0).getData());
 
     assertEquals("hello", remoteInterpreter2.interpret("hello", 
context1).message().get(0).getData());
     
remoteInterpreter2.getInterpreterGroup().close(remoteInterpreter2.getSessionId());
-    try {
-      assertEquals("hello", remoteInterpreter2.interpret("hello", 
context1).message().get(0).getData());
-      fail("Should not be able to call interpret after interpreter is closed");
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+
+    result = remoteInterpreter2.interpret("hello", context1);
+    assertEquals(Code.ERROR, result.code());
+    assertEquals("Interpreter process is not running\n", 
result.message().get(0).getData());
+
     
assertNull(remoteInterpreter2.getInterpreterGroup().getRemoteInterpreterProcess());
 
   }

Reply via email to