Repository: zeppelin
Updated Branches:
  refs/heads/master a39ce0141 -> 341883d9e


[ZEPPELIN-1907] Shell Interpreter does not renew ticket on secure cluster

### What is this PR for?
Kerberos ticket and renew lifetime are set to 1 hour. On accessing secure 
Hadoop from shell interpreter, it does kinit and returns result successfully 
but after 1 hour, the ticket gets expired and Hadoop list fails with below 
exception.

```
%sh
hadoop fs -ls /

17/01/05 09:29:45 WARN ipc.Client: Exception encountered while connecting to 
the server :
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
        at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
        at 
org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:413)
        at 
org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:595)
        at org.apache.hadoop.ipc.Client$Connection.access$2000(Client.java:397)
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:762)
        at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:758)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
        at 
org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:757)
        at org.apache.hadoop.ipc.Client$Connection.access$3200(Client.java:397)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1618)
        at org.apache.hadoop.ipc.Client.call(Client.java:1449)
        at org.apache.hadoop.ipc.Client.call(Client.java:1396)
        at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
        at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
ls: Failed on local exception: java.io.IOException: 
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]; Host Details : local host is: "zeppelin1.hwxblr.com/10.0.1.57"; 
destination host is: "zeppelin1.hwxblr.com":8020;
ExitValue: 1
```

### What type of PR is it?
[Bug Fix]

### What is the Jira issue?
* [ZEPPELIN-1907](https://issues.apache.org/jira/browse/ZEPPELIN-1907)

### How should this be tested?
On a Kerberos enabled cluster, run this paragraph
```
%sh
hdfs dfs -ls /user/zeppelin/
```
Wait for key-tab to expire (or run `kdestroy`), and re-run the same paragraph.

### Screenshots (if appropriate)
Before:
<img width="1438" alt="screen shot 2017-06-13 at 3 44 30 pm" 
src="https://user-images.githubusercontent.com/674497/27078184-511ed810-5050-11e7-8afa-90247f33047a.png";>

After:
<img width="1438" alt="screen shot 2017-06-13 at 3 44 04 pm" 
src="https://user-images.githubusercontent.com/674497/27078183-5109d690-5050-11e7-82e4-d79a5e98295f.png";>

### Questions:
* Does the licenses files need update?
* Is there breaking changes for older versions?
* Does this needs documentation?

Author: Prabhjyot Singh <prabhjyotsi...@gmail.com>
Author: Prabhjyot  Singh <prabhjyotsi...@gmail.com>

Closes #2407 from prabhjyotsingh/ZEPPELIN-1907 and squashes the following 
commits:

ffd5f11b2 [Prabhjyot  Singh] add bash after `
ecc1a7ce0 [Prabhjyot Singh] Merge remote-tracking branch 'origin/master' into 
ZEPPELIN-1907
9243c6ab9 [Prabhjyot Singh] replace `###` with `##`
443c407d3 [Prabhjyot Singh] add space before time(s)
adf23743b [Prabhjyot Singh] update documentation.
289b7d346 [Prabhjyot Singh] reset kinitFailCount on successful renew.
96bfdfe97 [Prabhjyot Singh] log more error
72b32ae25 [Prabhjyot Singh] add java doc
df6645a64 [Prabhjyot Singh] add KerberosInterpreter and move kinit loginc there.
856c8716e [Prabhjyot Singh] renew token periodically
ee741e483 [Prabhjyot Singh] @zjffdu review comments
7c539ef2e [Prabhjyot Singh] add null check
ab823d3ee [Prabhjyot Singh] relogin using keytab, and append message for the 
same


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/341883d9
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/341883d9
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/341883d9

Branch: refs/heads/master
Commit: 341883d9ed92a4079cb486d2b6b2b1d0c40f1575
Parents: a39ce01
Author: Prabhjyot Singh <prabhjyotsi...@gmail.com>
Authored: Fri Jun 23 21:51:00 2017 +0530
Committer: Prabhjyot Singh <prabhjyotsi...@gmail.com>
Committed: Tue Jun 27 08:50:13 2017 +0530

----------------------------------------------------------------------
 conf/zeppelin-env.sh.template                   |   5 +
 docs/interpreter/shell.md                       |  12 +-
 .../apache/zeppelin/shell/ShellInterpreter.java |  40 +++++-
 .../apache/zeppelin/interpreter/Constants.java  |  18 +++
 .../interpreter/KerberosInterpreter.java        | 125 +++++++++++++++++++
 5 files changed, 193 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/conf/zeppelin-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template
index ce55346..a9eccf6 100644
--- a/conf/zeppelin-env.sh.template
+++ b/conf/zeppelin-env.sh.template
@@ -53,6 +53,11 @@
 
 #### Spark interpreter configuration ####
 
+## Kerberos ticket refresh setting
+##
+#export KINIT_FAIL_THRESHOLD                    # (optional) How many times 
should kinit retry. The default value is 5.
+#export LAUNCH_KERBEROS_REFRESH_INTERVAL        # (optional) The refresh 
interval for Kerberos ticket. The default value is 1d.
+
 ## Use provided spark installation ##
 ## defining SPARK_HOME makes Zeppelin run spark interpreter process using 
spark-submit
 ##

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/docs/interpreter/shell.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/shell.md b/docs/interpreter/shell.md
index a3d8cea..4db73d9 100644
--- a/docs/interpreter/shell.md
+++ b/docs/interpreter/shell.md
@@ -66,4 +66,14 @@ The following example demonstrates the basic usage of Shell 
in a Zeppelin notebo
 <img src="/assets/themes/zeppelin/img/docs-img/shell-example.png" />
 
 If you need further information about **Zeppelin Interpreter Setting** for 
using Shell interpreter, 
-please read [What is interpreter 
setting?](../usage/interpreter/overview.html#what-is-interpreter-setting) 
section first.
\ No newline at end of file
+please read [What is interpreter 
setting?](../usage/interpreter/overview.html#what-is-interpreter-setting) 
section first.
+
+## Kerberos refresh interval
+For changing the default behavior of when to renew Kerberos ticket following 
changes can be made in `conf/zeppelin-env.sh`.
+
+```bash
+# Change Kerberos refresh interval (default value is 1d). Allowed postfix are 
ms, s, m, min, h, and d.
+export LAUNCH_KERBEROS_REFRESH_INTERVAL=4h
+# Change kinit number retries (default value is 5), which means if the kinit 
command fails for 5 retries consecutively it will close the interpreter. 
+export KINIT_FAIL_THRESHOLD=10
+```

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
----------------------------------------------------------------------
diff --git 
a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java 
b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
index ec75684..79fc3a3 100644
--- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
+++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java
@@ -30,8 +30,8 @@ import org.apache.commons.exec.ExecuteException;
 import org.apache.commons.exec.ExecuteWatchdog;
 import org.apache.commons.exec.PumpStreamHandler;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.KerberosInterpreter;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Shell interpreter for Zeppelin.
  */
-public class ShellInterpreter extends Interpreter {
+public class ShellInterpreter extends KerberosInterpreter {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ShellInterpreter.class);
   private static final String TIMEOUT_PROPERTY = 
"shell.command.timeout.millisecs";
   private final boolean isWindows = 
System.getProperty("os.name").startsWith("Windows");
@@ -60,12 +60,25 @@ public class ShellInterpreter extends Interpreter {
     LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY));
     executors = new ConcurrentHashMap<>();
     if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) {
-      ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
+      startKerberosLoginThread();
     }
   }
 
   @Override
-  public void close() {}
+  public void close() {
+    shutdownExecutorService();
+
+    for (String executorKey : executors.keySet()) {
+      DefaultExecutor executor = executors.remove(executorKey);
+      if (executor != null) {
+        try {
+          executor.getWatchdog().destroyProcess();
+        } catch (Exception e){
+          LOGGER.error("error destroying executor for paragraphId: " + 
executorKey, e);
+        }
+      }
+    }
+  }
 
 
   @Override
@@ -100,7 +113,7 @@ public class ShellInterpreter extends Interpreter {
       if (exitValue == 143) {
         code = Code.INCOMPLETE;
         message += "Paragraph received a SIGTERM\n";
-        LOGGER.info("The paragraph " + contextInterpreter.getParagraphId() 
+        LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
           + " stopped executing: " + message);
       }
       message += "ExitValue: " + exitValue;
@@ -117,7 +130,11 @@ public class ShellInterpreter extends Interpreter {
   public void cancel(InterpreterContext context) {
     DefaultExecutor executor = executors.remove(context.getParagraphId());
     if (executor != null) {
-      executor.getWatchdog().destroyProcess();
+      try {
+        executor.getWatchdog().destroyProcess();
+      } catch (Exception e){
+        LOGGER.error("error destroying executor for paragraphId: " + 
context.getParagraphId(), e);
+      }
     }
   }
 
@@ -143,4 +160,15 @@ public class ShellInterpreter extends Interpreter {
     return null;
   }
 
+  @Override
+  protected boolean runKerberosLogin() {
+    try {
+      ShellSecurityImpl.createSecureConfiguration(getProperty(), shell);
+    } catch (Exception e) {
+      LOGGER.error("Unable to run kinit for zeppelin", e);
+      return false;
+    }
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
index 9115a98..87748ff 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java
@@ -16,6 +16,11 @@
  */
 
 package org.apache.zeppelin.interpreter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Interpreter related constants
  * 
@@ -32,4 +37,17 @@ public class Constants {
 
   public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100;
 
+  public static final Map<String, TimeUnit> TIME_SUFFIXES;
+
+  static {
+    TIME_SUFFIXES = new HashMap<>();
+    TIME_SUFFIXES.put("us", TimeUnit.MICROSECONDS);
+    TIME_SUFFIXES.put("ms", TimeUnit.MILLISECONDS);
+    TIME_SUFFIXES.put("s", TimeUnit.SECONDS);
+    TIME_SUFFIXES.put("m", TimeUnit.MINUTES);
+    TIME_SUFFIXES.put("min", TimeUnit.MINUTES);
+    TIME_SUFFIXES.put("h", TimeUnit.HOURS);
+    TIME_SUFFIXES.put("d", TimeUnit.DAYS);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
new file mode 100644
index 0000000..4673e48
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.zeppelin.annotation.ZeppelinApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Interpreter wrapper for Kerberos initialization
+ *
+ * runKerberosLogin() method you need to implement that determine Zeppelin's 
behavior.
+ * startKerberosLoginThread() needs to be called inside the open() and
+ * shutdownExecutorService() inside close().
+ */
+public abstract class KerberosInterpreter extends Interpreter {
+
+  private Integer kinitFailCount = 0;
+  protected ScheduledExecutorService scheduledExecutorService;
+  public static Logger logger = 
LoggerFactory.getLogger(KerberosInterpreter.class);
+
+  public KerberosInterpreter(Properties property) {
+    super(property);
+  }
+
+  @ZeppelinApi
+  protected abstract boolean runKerberosLogin();
+
+  public String getKerberosRefreshInterval() {
+    if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) {
+      return "1d";
+    } else {
+      return System.getenv("KERBEROS_REFRESH_INTERVAL");
+    }
+  }
+
+  public Integer kinitFailThreshold() {
+    if (System.getenv("KINIT_FAIL_THRESHOLD") == null) {
+      return 5;
+    } else {
+      return new Integer(System.getenv("KINIT_FAIL_THRESHOLD"));
+    }
+  }
+
+  public Long getTimeAsMs(String time) {
+    if (time == null) {
+      logger.error("Cannot convert to time value.", time);
+      time = "1d";
+    }
+
+    Matcher m = 
Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase());
+    if (!m.matches()) {
+      throw new IllegalArgumentException("Invalid time string: " + time);
+    }
+
+    long val = Long.parseLong(m.group(1));
+    String suffix = m.group(2);
+
+    if (suffix != null && !Constants.TIME_SUFFIXES.containsKey(suffix)) {
+      throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\"");
+    }
+
+    return TimeUnit.MILLISECONDS.convert(val,
+        suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : 
TimeUnit.MILLISECONDS);
+  }
+
+  protected ScheduledExecutorService startKerberosLoginThread() {
+    scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
+    scheduledExecutorService.schedule(new Callable() {
+      public Object call() throws Exception {
+
+        if (runKerberosLogin()) {
+          logger.info("Ran runKerberosLogin command successfully.");
+          kinitFailCount = 0;
+          // schedule another kinit run with a fixed delay.
+          scheduledExecutorService
+              .schedule(this, getTimeAsMs(getKerberosRefreshInterval()), 
TimeUnit.MILLISECONDS);
+        } else {
+          kinitFailCount++;
+          logger.info("runKerberosLogin failed for " + kinitFailCount + " 
time(s).");
+          // schedule another retry at once or close the interpreter if too 
many times kinit fails
+          if (kinitFailCount >= kinitFailThreshold()) {
+            logger.error("runKerberosLogin failed for  max attempts, calling 
close interpreter.");
+            close();
+          } else {
+            scheduledExecutorService.submit(this);
+          }
+        }
+        return null;
+      }
+    }, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS);
+
+    return scheduledExecutorService;
+  }
+
+  protected void shutdownExecutorService() {
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdown();
+    }
+  }
+
+}

Reply via email to