Repository: zeppelin Updated Branches: refs/heads/master a39ce0141 -> 341883d9e
[ZEPPELIN-1907] Shell Interpreter does not renew ticket on secure cluster ### What is this PR for? Kerberos ticket and renew lifetime are set to 1 hour. On accessing secure Hadoop from shell interpreter, it does kinit and returns result successfully but after 1 hour, the ticket gets expired and Hadoop list fails with below exception. ``` %sh hadoop fs -ls / 17/01/05 09:29:45 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:413) at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:595) at org.apache.hadoop.ipc.Client$Connection.access$2000(Client.java:397) at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:762) at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:758) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:757) at org.apache.hadoop.ipc.Client$Connection.access$3200(Client.java:397) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1618) at org.apache.hadoop.ipc.Client.call(Client.java:1449) at org.apache.hadoop.ipc.Client.call(Client.java:1396) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) ls: Failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]; Host Details : local host is: "zeppelin1.hwxblr.com/10.0.1.57"; destination host is: "zeppelin1.hwxblr.com":8020; ExitValue: 1 ``` ### What type of PR is it? [Bug Fix] ### What is the Jira issue? * [ZEPPELIN-1907](https://issues.apache.org/jira/browse/ZEPPELIN-1907) ### How should this be tested? On a Kerberos enabled cluster, run this paragraph ``` %sh hdfs dfs -ls /user/zeppelin/ ``` Wait for key-tab to expire (or run `kdestroy`), and re-run the same paragraph. ### Screenshots (if appropriate) Before: <img width="1438" alt="screen shot 2017-06-13 at 3 44 30 pm" src="https://user-images.githubusercontent.com/674497/27078184-511ed810-5050-11e7-8afa-90247f33047a.png"> After: <img width="1438" alt="screen shot 2017-06-13 at 3 44 04 pm" src="https://user-images.githubusercontent.com/674497/27078183-5109d690-5050-11e7-82e4-d79a5e98295f.png"> ### Questions: * Does the licenses files need update? * Is there breaking changes for older versions? * Does this needs documentation? Author: Prabhjyot Singh <prabhjyotsi...@gmail.com> Author: Prabhjyot Singh <prabhjyotsi...@gmail.com> Closes #2407 from prabhjyotsingh/ZEPPELIN-1907 and squashes the following commits: ffd5f11b2 [Prabhjyot Singh] add bash after ` ecc1a7ce0 [Prabhjyot Singh] Merge remote-tracking branch 'origin/master' into ZEPPELIN-1907 9243c6ab9 [Prabhjyot Singh] replace `###` with `##` 443c407d3 [Prabhjyot Singh] add space before time(s) adf23743b [Prabhjyot Singh] update documentation. 289b7d346 [Prabhjyot Singh] reset kinitFailCount on successful renew. 96bfdfe97 [Prabhjyot Singh] log more error 72b32ae25 [Prabhjyot Singh] add java doc df6645a64 [Prabhjyot Singh] add KerberosInterpreter and move kinit loginc there. 856c8716e [Prabhjyot Singh] renew token periodically ee741e483 [Prabhjyot Singh] @zjffdu review comments 7c539ef2e [Prabhjyot Singh] add null check ab823d3ee [Prabhjyot Singh] relogin using keytab, and append message for the same Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/341883d9 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/341883d9 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/341883d9 Branch: refs/heads/master Commit: 341883d9ed92a4079cb486d2b6b2b1d0c40f1575 Parents: a39ce01 Author: Prabhjyot Singh <prabhjyotsi...@gmail.com> Authored: Fri Jun 23 21:51:00 2017 +0530 Committer: Prabhjyot Singh <prabhjyotsi...@gmail.com> Committed: Tue Jun 27 08:50:13 2017 +0530 ---------------------------------------------------------------------- conf/zeppelin-env.sh.template | 5 + docs/interpreter/shell.md | 12 +- .../apache/zeppelin/shell/ShellInterpreter.java | 40 +++++- .../apache/zeppelin/interpreter/Constants.java | 18 +++ .../interpreter/KerberosInterpreter.java | 125 +++++++++++++++++++ 5 files changed, 193 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/conf/zeppelin-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template index ce55346..a9eccf6 100644 --- a/conf/zeppelin-env.sh.template +++ b/conf/zeppelin-env.sh.template @@ -53,6 +53,11 @@ #### Spark interpreter configuration #### +## Kerberos ticket refresh setting +## +#export KINIT_FAIL_THRESHOLD # (optional) How many times should kinit retry. The default value is 5. +#export LAUNCH_KERBEROS_REFRESH_INTERVAL # (optional) The refresh interval for Kerberos ticket. The default value is 1d. + ## Use provided spark installation ## ## defining SPARK_HOME makes Zeppelin run spark interpreter process using spark-submit ## http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/docs/interpreter/shell.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/shell.md b/docs/interpreter/shell.md index a3d8cea..4db73d9 100644 --- a/docs/interpreter/shell.md +++ b/docs/interpreter/shell.md @@ -66,4 +66,14 @@ The following example demonstrates the basic usage of Shell in a Zeppelin notebo <img src="/assets/themes/zeppelin/img/docs-img/shell-example.png" /> If you need further information about **Zeppelin Interpreter Setting** for using Shell interpreter, -please read [What is interpreter setting?](../usage/interpreter/overview.html#what-is-interpreter-setting) section first. \ No newline at end of file +please read [What is interpreter setting?](../usage/interpreter/overview.html#what-is-interpreter-setting) section first. + +## Kerberos refresh interval +For changing the default behavior of when to renew Kerberos ticket following changes can be made in `conf/zeppelin-env.sh`. + +```bash +# Change Kerberos refresh interval (default value is 1d). Allowed postfix are ms, s, m, min, h, and d. +export LAUNCH_KERBEROS_REFRESH_INTERVAL=4h +# Change kinit number retries (default value is 5), which means if the kinit command fails for 5 retries consecutively it will close the interpreter. +export KINIT_FAIL_THRESHOLD=10 +``` http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java index ec75684..79fc3a3 100644 --- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java +++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java @@ -30,8 +30,8 @@ import org.apache.commons.exec.ExecuteException; import org.apache.commons.exec.ExecuteWatchdog; import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.KerberosInterpreter; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; /** * Shell interpreter for Zeppelin. */ -public class ShellInterpreter extends Interpreter { +public class ShellInterpreter extends KerberosInterpreter { private static final Logger LOGGER = LoggerFactory.getLogger(ShellInterpreter.class); private static final String TIMEOUT_PROPERTY = "shell.command.timeout.millisecs"; private final boolean isWindows = System.getProperty("os.name").startsWith("Windows"); @@ -60,12 +60,25 @@ public class ShellInterpreter extends Interpreter { LOGGER.info("Command timeout property: {}", getProperty(TIMEOUT_PROPERTY)); executors = new ConcurrentHashMap<>(); if (!StringUtils.isAnyEmpty(getProperty("zeppelin.shell.auth.type"))) { - ShellSecurityImpl.createSecureConfiguration(getProperty(), shell); + startKerberosLoginThread(); } } @Override - public void close() {} + public void close() { + shutdownExecutorService(); + + for (String executorKey : executors.keySet()) { + DefaultExecutor executor = executors.remove(executorKey); + if (executor != null) { + try { + executor.getWatchdog().destroyProcess(); + } catch (Exception e){ + LOGGER.error("error destroying executor for paragraphId: " + executorKey, e); + } + } + } + } @Override @@ -100,7 +113,7 @@ public class ShellInterpreter extends Interpreter { if (exitValue == 143) { code = Code.INCOMPLETE; message += "Paragraph received a SIGTERM\n"; - LOGGER.info("The paragraph " + contextInterpreter.getParagraphId() + LOGGER.info("The paragraph " + contextInterpreter.getParagraphId() + " stopped executing: " + message); } message += "ExitValue: " + exitValue; @@ -117,7 +130,11 @@ public class ShellInterpreter extends Interpreter { public void cancel(InterpreterContext context) { DefaultExecutor executor = executors.remove(context.getParagraphId()); if (executor != null) { - executor.getWatchdog().destroyProcess(); + try { + executor.getWatchdog().destroyProcess(); + } catch (Exception e){ + LOGGER.error("error destroying executor for paragraphId: " + context.getParagraphId(), e); + } } } @@ -143,4 +160,15 @@ public class ShellInterpreter extends Interpreter { return null; } + @Override + protected boolean runKerberosLogin() { + try { + ShellSecurityImpl.createSecureConfiguration(getProperty(), shell); + } catch (Exception e) { + LOGGER.error("Unable to run kinit for zeppelin", e); + return false; + } + return true; + } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java index 9115a98..87748ff 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Constants.java @@ -16,6 +16,11 @@ */ package org.apache.zeppelin.interpreter; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + /** * Interpreter related constants * @@ -32,4 +37,17 @@ public class Constants { public static final int ZEPPELIN_INTERPRETER_OUTPUT_LIMIT = 1024 * 100; + public static final Map<String, TimeUnit> TIME_SUFFIXES; + + static { + TIME_SUFFIXES = new HashMap<>(); + TIME_SUFFIXES.put("us", TimeUnit.MICROSECONDS); + TIME_SUFFIXES.put("ms", TimeUnit.MILLISECONDS); + TIME_SUFFIXES.put("s", TimeUnit.SECONDS); + TIME_SUFFIXES.put("m", TimeUnit.MINUTES); + TIME_SUFFIXES.put("min", TimeUnit.MINUTES); + TIME_SUFFIXES.put("h", TimeUnit.HOURS); + TIME_SUFFIXES.put("d", TimeUnit.DAYS); + } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/341883d9/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java new file mode 100644 index 0000000..4673e48 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/KerberosInterpreter.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.zeppelin.annotation.ZeppelinApi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Interpreter wrapper for Kerberos initialization + * + * runKerberosLogin() method you need to implement that determine Zeppelin's behavior. + * startKerberosLoginThread() needs to be called inside the open() and + * shutdownExecutorService() inside close(). + */ +public abstract class KerberosInterpreter extends Interpreter { + + private Integer kinitFailCount = 0; + protected ScheduledExecutorService scheduledExecutorService; + public static Logger logger = LoggerFactory.getLogger(KerberosInterpreter.class); + + public KerberosInterpreter(Properties property) { + super(property); + } + + @ZeppelinApi + protected abstract boolean runKerberosLogin(); + + public String getKerberosRefreshInterval() { + if (System.getenv("KERBEROS_REFRESH_INTERVAL") == null) { + return "1d"; + } else { + return System.getenv("KERBEROS_REFRESH_INTERVAL"); + } + } + + public Integer kinitFailThreshold() { + if (System.getenv("KINIT_FAIL_THRESHOLD") == null) { + return 5; + } else { + return new Integer(System.getenv("KINIT_FAIL_THRESHOLD")); + } + } + + public Long getTimeAsMs(String time) { + if (time == null) { + logger.error("Cannot convert to time value.", time); + time = "1d"; + } + + Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(time.toLowerCase()); + if (!m.matches()) { + throw new IllegalArgumentException("Invalid time string: " + time); + } + + long val = Long.parseLong(m.group(1)); + String suffix = m.group(2); + + if (suffix != null && !Constants.TIME_SUFFIXES.containsKey(suffix)) { + throw new IllegalArgumentException("Invalid suffix: \"" + suffix + "\""); + } + + return TimeUnit.MILLISECONDS.convert(val, + suffix != null ? Constants.TIME_SUFFIXES.get(suffix) : TimeUnit.MILLISECONDS); + } + + protected ScheduledExecutorService startKerberosLoginThread() { + scheduledExecutorService = Executors.newScheduledThreadPool(1); + + scheduledExecutorService.schedule(new Callable() { + public Object call() throws Exception { + + if (runKerberosLogin()) { + logger.info("Ran runKerberosLogin command successfully."); + kinitFailCount = 0; + // schedule another kinit run with a fixed delay. + scheduledExecutorService + .schedule(this, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS); + } else { + kinitFailCount++; + logger.info("runKerberosLogin failed for " + kinitFailCount + " time(s)."); + // schedule another retry at once or close the interpreter if too many times kinit fails + if (kinitFailCount >= kinitFailThreshold()) { + logger.error("runKerberosLogin failed for max attempts, calling close interpreter."); + close(); + } else { + scheduledExecutorService.submit(this); + } + } + return null; + } + }, getTimeAsMs(getKerberosRefreshInterval()), TimeUnit.MILLISECONDS); + + return scheduledExecutorService; + } + + protected void shutdownExecutorService() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + } + +}