This is an automated email from the ASF dual-hosted git repository. moon pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 6225868 [ZEPPELIN-4722] User Impersonation via --proxy-user for Spark Interpreter with K8s 6225868 is described below commit 62258682c103732a516c36e6ac4a2a22bf800278 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Wed Apr 8 09:39:30 2020 +0200 [ZEPPELIN-4722] User Impersonation via --proxy-user for Spark Interpreter with K8s ### What is this PR for? This PullRequest fixes the impersonation issue when running Spark Zeppelin interpreter on K8s. A general user impersonation should not be necessary and possible on K8s, since the interpreter process runs unprivileged - without the right to change the user via ssh. ### What type of PR is it? * Improvement ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4722 ### How should this be tested? * **Travic-CI**: https://travis-ci.org/github/Reamer/zeppelin/builds/674723139 ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #3733 from Reamer/proxy_user and squashes the following commits: 8c2e58bb5 [Philipp Dallig] proxy user in spark on k8s 3989c4e87 [Philipp Dallig] Some Cleanup (cherry picked from commit 85fc206b4f1504dc29da8264b3619ed31006a3b2) Signed-off-by: Lee moon soo <m...@apache.org> --- .../zeppelin/conf/ZeppelinConfiguration.java | 5 + .../launcher/K8sRemoteInterpreterProcess.java | 46 ++++---- .../launcher/K8sStandardInterpreterLauncher.java | 23 ++-- .../zeppelin/interpreter/launcher/Kubectl.java | 11 +- .../launcher/K8sRemoteInterpreterProcessTest.java | 117 +++++++++++++++++++-- .../launcher/SparkInterpreterLauncher.java | 5 +- 6 files changed, 164 insertions(+), 43 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 1be243f..3deb8af 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -697,6 +697,10 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getString(ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS); } + public boolean getZeppelinImpersonateSparkProxyUser() { + return getBoolean(ConfVars.ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER); + } + public String getZeppelinNotebookGitURL() { return getString(ConfVars.ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL); } @@ -998,6 +1002,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_DOCKER_CONTAINER_IMAGE("zeppelin.docker.container.image", "apache/zeppelin:" + Util.getVersion()), + ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER("zeppelin.impersonate.spark.proxy.user", true), ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""), ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"), ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""), diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index 959718d..5f872cc 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -13,13 +13,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.hubspot.jinjava.Jinjava; import org.apache.commons.exec.ExecuteWatchdog; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { - private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class); + private static final Logger LOGGER = LoggerFactory.getLogger(K8sRemoteInterpreterProcess.class); private static final int K8S_INTERPRETER_SERVICE_PORT = 12321; private final Kubectl kubectl; private final String interpreterGroupId; @@ -39,6 +41,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { private ExecuteWatchdog portForwardWatchdog; private int podPort = K8S_INTERPRETER_SERVICE_PORT; + private final boolean isUserImpersonatedForSpark; + private String userName; + private AtomicBoolean started = new AtomicBoolean(false); public K8sRemoteInterpreterProcess( @@ -54,7 +59,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { String zeppelinServiceRpcPort, boolean portForward, String sparkImage, - int connectTimeout + int connectTimeout, + boolean isUserImpersonatedForSpark ) { super(connectTimeout); this.kubectl = kubectl; @@ -64,12 +70,13 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { this.interpreterGroupName = interpreterGroupName; this.interpreterSettingName = interpreterSettingName; this.properties = properties; - this.envs = new HashMap(envs); + this.envs = new HashMap<>(envs); this.zeppelinServiceHost = zeppelinServiceHost; this.zeppelinServiceRpcPort = zeppelinServiceRpcPort; this.portForward = portForward; this.sparkImage = sparkImage; this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6); + this.isUserImpersonatedForSpark = isUserImpersonatedForSpark; } @@ -89,6 +96,14 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { @Override public void start(String userName) throws IOException { + /** + * If a spark interpreter process is running, userName is set in preparation for --proxy-user + */ + if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) { + this.userName = userName; + } else { + this.userName = null; + } // create new pod apply(specTempaltes, false); kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", getConnectTimeout()/1000); @@ -116,9 +131,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { } if (!started.get()) { - LOGGER.info( - String.format("Interpreter pod creation is time out in %d seconds", - getConnectTimeout()/1000)); + LOGGER.info("Interpreter pod creation is time out in {} seconds", getConnectTimeout()/1000); } // waits for interpreter thrift rpc server ready @@ -210,7 +223,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { */ void apply(File path, boolean delete) throws IOException { if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) { - LOGGER.info("Skip " + path.getAbsolutePath()); + LOGGER.info("Skip {}", path.getAbsolutePath()); } if (path.isDirectory()) { @@ -224,7 +237,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { apply(f, delete); } } else if (path.isFile()) { - LOGGER.info("Apply " + path.getAbsolutePath()); + LOGGER.info("Apply {}", path.getAbsolutePath()); K8sSpecTemplate specTemplate = new K8sSpecTemplate(); specTemplate.loadProperties(getTemplateBindings()); @@ -235,12 +248,12 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { kubectl.apply(spec); } } else { - LOGGER.error("Can't apply " + path.getAbsolutePath()); + LOGGER.error("Can't apply {}", path.getAbsolutePath()); } } @VisibleForTesting - Properties getTemplateBindings() throws IOException { + Properties getTemplateBindings() { Properties k8sProperties = new Properties(); // k8s template properties @@ -318,11 +331,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { boolean isSparkOnKubernetes(Properties interpreteProperties) { String propertySparkMaster = (String) interpreteProperties.getOrDefault("master", ""); - if (propertySparkMaster.startsWith("k8s://")) { - return true; - } else { - return false; - } + return propertySparkMaster.startsWith("k8s://"); } @VisibleForTesting @@ -334,6 +343,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { if (properties.containsKey("spark.driver.memory")) { options.append(" --driver-memory " + properties.get("spark.driver.memory")); } + if (userName != null) { + options.append(" --proxy-user " + userName); + } options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace()); options.append(" --conf spark.executor.instances=1"); options.append(" --conf spark.kubernetes.driver.pod.name=" + getPodName()); @@ -392,9 +404,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { char c = chars[random.nextInt(chars.length)]; sb.append(c); } - String randomStr = sb.toString(); - - return randomStr; + return sb.toString(); } @Override diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java index 3f2e39d..085c3f9 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java @@ -70,11 +70,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { * @return */ boolean isRunningOnKubernetes() { - if (new File("/var/run/secrets/kubernetes.io").exists()) { - return true; - } else { - return false; - } + return new File("/var/run/secrets/kubernetes.io").exists(); } /** @@ -130,9 +126,21 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { } } + /** + * Interpreter Process will run in K8s. There is no point in changing the user after starting the container. + * Switching to an other user (non-privileged) should be done during the image creation process. + * + * Only if a spark interpreter process is running, userImpersonatation should be possible for --proxy-user + */ + private boolean isUserImpersonateForSparkInterpreter(InterpreterLaunchContext context) { + return zConf.getZeppelinImpersonateSparkProxyUser() && + context.getOption().isUserImpersonate() && + "spark".equalsIgnoreCase(context.getInterpreterGroupId()); + } + @Override public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { - LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); + LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup()); this.context = context; this.properties = context.getProperties(); int connectTimeout = getConnectTimeout(); @@ -150,7 +158,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { getZeppelinServiceRpcPort(), zConf.getK8sPortForward(), zConf.getK8sSparkContainerImage(), - connectTimeout); + connectTimeout, + isUserImpersonateForSparkInterpreter(context)); } protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java index 2079d16..39e7e92 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.interpreter.launcher; import com.google.common.annotations.VisibleForTesting; -import com.google.gson.Gson; import java.util.ArrayList; import java.util.Arrays; @@ -31,9 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Kubectl { - private final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class); private final String kubectlCmd; - private final Gson gson = new Gson(); private String namespace; public Kubectl(String kubectlCmd) { @@ -118,7 +116,7 @@ public class Kubectl { argsToOverride.add("--namespace=" + namespace); } - LOGGER.info("kubectl " + argsToOverride); + LOGGER.info("kubectl {}", argsToOverride); LOGGER.debug(stdin); try { @@ -130,8 +128,7 @@ public class Kubectl { ); if (exitCode == 0) { - String output = new String(stdout.toByteArray()); - return output; + return new String(stdout.toByteArray()); } else { String output = new String(stderr.toByteArray()); throw new IOException(String.format("non zero return code (%d). %s", exitCode, output)); @@ -147,7 +144,7 @@ public class Kubectl { CommandLine cmd = new CommandLine(kubectlCmd); cmd.addArguments(args); - ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000); + ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000L); executor.setWatchdog(watchdog); PumpStreamHandler streamHandler = new PumpStreamHandler(stdout, stderr, stdin); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java index 9d6b634..1459810 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Properties; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -53,11 +54,8 @@ public class K8sRemoteInterpreterProcessTest { "12320", false, "spark-container:1.0", - 10); - - // when - String host = intp.getHost(); - int port = intp.getPort(); + 10, + false); // then assertEquals(String.format("%s.%s.svc", intp.getPodName(), kubectl.getNamespace()), intp.getHost()); @@ -86,7 +84,8 @@ public class K8sRemoteInterpreterProcessTest { "12320", false, "spark-container:1.0", - 10); + 10, + false); // following values are hardcoded in k8s/interpreter/100-interpreter.yaml. @@ -120,7 +119,8 @@ public class K8sRemoteInterpreterProcessTest { "12320", false, "spark-container:1.0", - 10); + 10, + false); // when Properties p = intp.getTemplateBindings(); @@ -172,9 +172,11 @@ public class K8sRemoteInterpreterProcessTest { "12320", false, "spark-container:1.0", - 10); + 10, + false); // when + intp.start("mytestUser"); Properties p = intp.getTemplateBindings(); // then @@ -192,6 +194,105 @@ public class K8sRemoteInterpreterProcessTest { assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost())); assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort())); assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort())); + assertFalse(sparkSubmitOptions.contains("--proxy-user")); + assertTrue(intp.isSpark()); + } + + @Test + public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException { + // given + Kubectl kubectl = mock(Kubectl.class); + when(kubectl.getNamespace()).thenReturn("default"); + + Properties properties = new Properties(); + properties.put("my.key1", "v1"); + properties.put("master", "k8s://http://api"); + HashMap<String, String> envs = new HashMap<String, String>(); + envs.put("MY_ENV1", "V1"); + envs.put("SPARK_SUBMIT_OPTIONS", "my options"); + envs.put("SERVICE_DOMAIN", "mydomain"); + + K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( + kubectl, + new File(".skip"), + "interpreter-container:1.0", + "shared_process", + "spark", + "myspark", + properties, + envs, + "zeppelin.server.hostname", + "12320", + false, + "spark-container:1.0", + 10, + true); + + // when + intp.start("mytestUser"); + Properties p = intp.getTemplateBindings(); + // then + assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image")); + assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl")); + + envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs"); + assertTrue( envs.containsKey("SPARK_HOME")); + + String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS"); + assertTrue(sparkSubmitOptions.startsWith("my options ")); + assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace())); + assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName())); + assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0")); + assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost())); + assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort())); + assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort())); + assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser")); + assertTrue(intp.isSpark()); + } + + @Test + public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOException { + // given + Kubectl kubectl = mock(Kubectl.class); + when(kubectl.getNamespace()).thenReturn("default"); + + Properties properties = new Properties(); + properties.put("my.key1", "v1"); + properties.put("master", "k8s://http://api"); + HashMap<String, String> envs = new HashMap<String, String>(); + envs.put("MY_ENV1", "V1"); + envs.put("SPARK_SUBMIT_OPTIONS", "my options"); + envs.put("SERVICE_DOMAIN", "mydomain"); + + K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( + kubectl, + new File(".skip"), + "interpreter-container:1.0", + "shared_process", + "spark", + "myspark", + properties, + envs, + "zeppelin.server.hostname", + "12320", + false, + "spark-container:1.0", + 10, + true); + + // when + intp.start("anonymous"); + Properties p = intp.getTemplateBindings(); + // then + assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image")); + assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl")); + + envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs"); + assertTrue( envs.containsKey("SPARK_HOME")); + + String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS"); + assertFalse(sparkSubmitOptions.contains("--proxy-user")); + assertTrue(intp.isSpark()); } @Test diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 02e8f1b..7dc888f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -156,9 +156,8 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { for (String name : sparkProperties.stringPropertyNames()) { sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name)); } - String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER"); - if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) || - !useProxyUserEnv.equals("false"))) { + + if (context.getOption().isUserImpersonate() && zConf.getZeppelinImpersonateSparkProxyUser()) { sparkConfBuilder.append(" --proxy-user " + context.getUserName()); }