This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 8af054e [ZEPPELIN-4915] K8s with java lib 8af054e is described below commit 8af054e851a5f3cae5238a311600279d630354ed Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Wed Jul 8 18:21:37 2020 +0200 [ZEPPELIN-4915] K8s with java lib ### What is this PR for? This PR changes from a Kubectl binary to a Java library for connecting to a Kubernetes cluster. I decided to use [fabric8/kubernetes-client](https://github.com/fabric8io/kubernetes-client), because it's also used by the [Apache Spark project](https://github.com/fabric8io/kubernetes-client#who-uses-kubernetes--openshift-java-client) and allows the import of our generic files rendered with jinja2. ### What type of PR is it? - Improvement ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4915 ### How should this be tested? * Travis-CI: https://travis-ci.org/github/Reamer/zeppelin/builds/706228986 ### Questions: * Does the licenses files need update? Yes * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #3825 from Reamer/k8s_with_java_lib and squashes the following commits: 2cb46a7c4 [Philipp Dallig] Describe how an Auth-Token can be provided for communication with K8s c6a7cda78 [Philipp Dallig] Added the ability to use a custom namespace for development 1f50b3971 [Philipp Dallig] Correct rendering of template 495c90bb1 [Philipp Dallig] accept the proposals of the review 37acc5e25 [Philipp Dallig] Add license for kubernetes-client a95bd1988 [Philipp Dallig] Some cleanup 827cb5df4 [Philipp Dallig] Rewrite thread handling for K8s Launcher 143a5b5ec [Philipp Dallig] Switch from kubectl binary to java library ce9fce12a [Philipp Dallig] Update jinja to a new version bdbcec83f [Philipp Dallig] Add LstripBlock and TrimBlocks in Jinja rendering c4906f27b [Philipp Dallig] cleanup RemoteInterpreterUtils (cherry picked from commit b70a1b81b4fe1bea81313721f3ab1abcab0d7929) Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> --- LICENSE | 1 + docs/quickstart/kubernetes.md | 3 +- scripts/docker/zeppelin/bin/Dockerfile | 7 - .../zeppelin/conf/ZeppelinConfiguration.java | 6 +- .../interpreter/launcher/InterpreterLauncher.java | 19 +- .../interpreter/remote/RemoteInterpreterUtils.java | 28 +-- zeppelin-plugins/launcher/k8s-standard/pom.xml | 77 ++++++++- .../launcher/K8sRemoteInterpreterProcess.java | 191 ++++++++++----------- .../interpreter/launcher/K8sSpecTemplate.java | 11 +- .../launcher/K8sStandardInterpreterLauncher.java | 33 ++-- .../zeppelin/interpreter/launcher/Kubectl.java | 155 ----------------- .../launcher/K8sRemoteInterpreterProcessTest.java | 91 ++++------ .../interpreter/launcher/K8sSpecTemplateTest.java | 15 ++ .../K8sStandardInterpreterLauncherTest.java | 43 ++--- .../zeppelin/interpreter/launcher/KubectlTest.java | 105 ----------- 15 files changed, 274 insertions(+), 511 deletions(-) diff --git a/LICENSE b/LICENSE index d6460e9..5bd3e1a 100644 --- a/LICENSE +++ b/LICENSE @@ -267,6 +267,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version (Apache 2.0) Embedded MongoDB (https://github.com/flapdoodle-oss/de.flapdoodle.embed.mongo) (Apache 2.0) Kotlin (https://github.com/JetBrains/kotlin) (Apache 2.0) s3proxy (https://github.com/gaul/s3proxy) + (Apache 2.0) kubernetes-client (https://github.com/fabric8io/kubernetes-client) ======================================================================== BSD 3-Clause licenses diff --git a/docs/quickstart/kubernetes.md b/docs/quickstart/kubernetes.md index cd45912..2cfbbca 100644 --- a/docs/quickstart/kubernetes.md +++ b/docs/quickstart/kubernetes.md @@ -264,5 +264,6 @@ Zeppelin can run locally (such as inside your IDE in debug mode) and able to run | ZEPPELIN_K8S_PORTFORWARD | true | Enable port forwarding from local Zeppelin instance to Interpreters running on Kubernetes | | ZEPPELIN_K8S_CONTAINER_IMAGE | <image>:<version> | Zeppelin interpreter docker image to use | | ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE | <image>:<version> | Spark docker image to use | +| ZEPPELIN_K8S_NAMESPACE | <k8s namespace> | Kubernetes namespace to use | +| KUBERNETES_AUTH_TOKEN | <token> | Kubernetes auth token to create resources | -`kubectl` command need to be configured to connect your Kubernetes cluster. diff --git a/scripts/docker/zeppelin/bin/Dockerfile b/scripts/docker/zeppelin/bin/Dockerfile index 6745b46..e1bcba8 100644 --- a/scripts/docker/zeppelin/bin/Dockerfile +++ b/scripts/docker/zeppelin/bin/Dockerfile @@ -97,13 +97,6 @@ RUN echo "$LOG_TAG Install R related packages" && \ R -e "install.packages('Rcpp', repos='http://cran.us.r-project.org')" && \ Rscript -e "library('devtools'); library('Rcpp'); install_github('ramnathv/rCharts')" -# Install kubectl -RUN apt-get install -y apt-transport-https && \ - curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \ - echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list && \ - apt-get update && \ - apt-get install -y kubectl - RUN echo "$LOG_TAG Cleanup" && \ apt-get autoclean && \ apt-get clean diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 7331a50..112bdbf 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -828,8 +828,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getBoolean(ConfVars.ZEPPELIN_K8S_PORTFORWARD); } - public String getK8sKubectlCmd() { - return getString(ConfVars.ZEPPELIN_K8S_KUBECTL); + public String getK8sNamepsace() { + return getString(ConfVars.ZEPPELIN_K8S_NAMESPACE); } public String getK8sContainerImage() { @@ -1032,8 +1032,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_RUN_MODE("zeppelin.run.mode", "auto"), // auto | local | k8s | Docker ZEPPELIN_K8S_PORTFORWARD("zeppelin.k8s.portforward", false), // kubectl port-forward incase of Zeppelin is running outside of kuberentes - ZEPPELIN_K8S_KUBECTL("zeppelin.k8s.kubectl", "kubectl"), // kubectl command ZEPPELIN_K8S_CONTAINER_IMAGE("zeppelin.k8s.container.image", "apache/zeppelin:" + Util.getVersion()), + ZEPPELIN_K8S_NAMESPACE("zeppelin.k8s.namespace", "default"), // specify a namespace incase of Zeppelin is running outside of kuberentes ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE("zeppelin.k8s.spark.container.image", "apache/spark:latest"), ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"), ZEPPELIN_K8S_SERVICE_NAME("zeppelin.k8s.service.name", "zeppelin-server"), diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java index 1fb2ea9..0d90fc4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java @@ -17,23 +17,21 @@ package org.apache.zeppelin.interpreter.launcher; +import java.io.IOException; +import java.util.Properties; + import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.InterpreterRunner; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Properties; - /** * Component to Launch interpreter process. */ public abstract class InterpreterLauncher { - private static Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class); - private static String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`"; + private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class); + private static final String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`"; protected ZeppelinConfiguration zConf; protected Properties properties; @@ -89,14 +87,13 @@ public abstract class InterpreterLauncher { recoveryStorage.getInterpreterClient(context.getInterpreterGroupId()); if (recoveredClient != null) { if (recoveredClient.isRunning()) { - LOGGER.info("Recover interpreter process running at {} of interpreter group: {}", - recoveredClient.getHost() + ":" + recoveredClient.getPort(), + LOGGER.info("Recover interpreter process running at {}:{} of interpreter group: {}", + recoveredClient.getHost(), recoveredClient.getPort(), recoveredClient.getInterpreterGroupId()); return recoveredClient; } else { recoveryStorage.removeInterpreterClient(context.getInterpreterGroupId()); - LOGGER.warn("Unable to recover interpreter process: " + recoveredClient.getHost() + ":" - + recoveredClient.getPort() + ", as it is already terminated."); + LOGGER.warn("Unable to recover interpreter process: {}:{}, as it is already terminated.", recoveredClient.getHost(), recoveredClient.getPort()); } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java index 4f7c9a5..5a0a3ab 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java @@ -24,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.ConnectException; import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -40,14 +39,16 @@ import java.util.Collections; * */ public class RemoteInterpreterUtils { - static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class); + private RemoteInterpreterUtils() { + throw new IllegalStateException("Utility class"); + } public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException { int port; try (ServerSocket socket = new ServerSocket(0);) { port = socket.getLocalPort(); - socket.close(); } return port; } @@ -117,25 +118,14 @@ public class RemoteInterpreterUtils { } public static boolean checkIfRemoteEndpointAccessible(String host, int port) { - try { - Socket discover = new Socket(); + try (Socket discover = new Socket()) { discover.setSoTimeout(1000); discover.connect(new InetSocketAddress(host, port), 1000); - discover.close(); return true; - } catch (ConnectException cne) { + } catch (IOException e) { // end point is not accessible - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " + - "(might be initializing): " + cne.getMessage()); - } - return false; - } catch (IOException ioe) { - // end point is not accessible - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " + - "(might be initializing): " + ioe.getMessage()); - } + LOGGER.debug("Remote endpoint '{}:{}' is not accessible " + + "(might be initializing): {}" , host, port, e.getMessage()); return false; } } @@ -143,7 +133,7 @@ public class RemoteInterpreterUtils { public static String getInterpreterSettingId(String intpGrpId) { String settingId = null; if (intpGrpId != null) { - int indexOfColon = intpGrpId.indexOf("-"); + int indexOfColon = intpGrpId.indexOf('-'); settingId = intpGrpId.substring(0, indexOfColon); } return settingId; diff --git a/zeppelin-plugins/launcher/k8s-standard/pom.xml b/zeppelin-plugins/launcher/k8s-standard/pom.xml index a5a385c..077408e 100644 --- a/zeppelin-plugins/launcher/k8s-standard/pom.xml +++ b/zeppelin-plugins/launcher/k8s-standard/pom.xml @@ -37,22 +37,50 @@ <properties> <plugin.name>Launcher/K8sStandardInterpreterLauncher</plugin.name> + <kubernetes.client.version>4.10.2</kubernetes.client.version> + <jinjava.version>2.5.4</jinjava.version> </properties> <dependencies> <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-client</artifactId> + <version>${kubernetes.client.version}</version> + <scope>compile</scope> + </dependency> + + <dependency> <groupId>com.hubspot.jinjava</groupId> <artifactId>jinjava</artifactId> - <version>2.4.12</version> + <version>${jinjava.version}</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- Test libraries --> + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-server-mock</artifactId> + <version>${kubernetes.client.version}</version> + <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> - <artifactId>maven-dependency-plugin</artifactId> - </plugin> - <plugin> <artifactId>maven-enforcer-plugin</artifactId> <executions> <execution> @@ -61,6 +89,47 @@ </execution> </executions> </plugin> + <!-- Shade the whole plugin, because spark provides an other version of okio --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + <configuration> + <relocations> + <relocation> + <pattern>okio</pattern> + <shadedPattern>org.apache.zeppelin.shaded.okio</shadedPattern> + </relocation> + </relocations> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </plugin> + <!-- disable maven-dependency-plugin with execution copy-plugin-dependencies because we shade the whole dependency --> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-plugin-dependencies</id> + <phase>none</phase> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index 5735c31..5df2f02 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -1,17 +1,16 @@ package org.apache.zeppelin.interpreter.launcher; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import java.io.File; import java.io.IOException; -import java.util.*; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; -import com.hubspot.jinjava.Jinjava; -import org.apache.commons.exec.ExecuteWatchdog; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; @@ -19,10 +18,23 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.hubspot.jinjava.Jinjava; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodStatus; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.LocalPortForward; +import io.fabric8.kubernetes.client.dsl.ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable; + public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { private static final Logger LOGGER = LoggerFactory.getLogger(K8sRemoteInterpreterProcess.class); private static final int K8S_INTERPRETER_SERVICE_PORT = 12321; - private final Kubectl kubectl; + private final KubernetesClient client; + private final String namespace; private final String interpreterGroupId; private final String interpreterGroupName; private final String interpreterSettingName; @@ -31,15 +43,13 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { private final Properties properties; private final Map<String, String> envs; - private final Gson gson = new Gson(); private final String podName; private final boolean portForward; private final String sparkImage; - private ExecuteWatchdog portForwardWatchdog; + private LocalPortForward localPortForward; private int podPort = K8S_INTERPRETER_SERVICE_PORT; private final boolean isUserImpersonatedForSpark; - private String userName; private AtomicBoolean started = new AtomicBoolean(false); private Random rand = new Random(); @@ -50,7 +60,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { private static final String ENV_SERVICE_DOMAIN = "SERVICE_DOMAIN"; public K8sRemoteInterpreterProcess( - Kubectl kubectl, + KubernetesClient client, + String namespace, File specTemplates, String containerImage, String interpreterGroupId, @@ -66,7 +77,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { boolean isUserImpersonatedForSpark ) { super(connectTimeout, intpEventServerHost, intpEventServerPort); - this.kubectl = kubectl; + this.client = client; + this.namespace = namespace; this.specTempaltes = specTemplates; this.containerImage = containerImage; this.interpreterGroupId = interpreterGroupId; @@ -85,11 +97,18 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { * Get interpreter pod name * @return */ - @VisibleForTesting - String getPodName() { + public String getPodName() { return podName; } + /** + * Get namespace + * @return + */ + public String getNamespace() { + return namespace; + } + @Override public String getInterpreterGroupId() { return interpreterGroupId; @@ -102,76 +121,71 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { @Override public void start(String userName) throws IOException { - /** - * If a spark interpreter process is running, userName is set in preparation for --proxy-user - */ - if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) { - this.userName = userName; - } else { - this.userName = null; - } + + Properties templateProperties = getTemplateBindings(userName); // create new pod - apply(specTempaltes, false); - kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", getConnectTimeout()/1000); + apply(specTempaltes, false, templateProperties); if (portForward) { podPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - portForwardWatchdog = kubectl.portForward( - String.format("pod/%s", getPodName()), - new String[] { - String.format("%s:%s", podPort, K8S_INTERPRETER_SERVICE_PORT) - }); + localPortForward = client.pods().inNamespace(namespace).withName(podName).portForward(K8S_INTERPRETER_SERVICE_PORT, podPort); } long startTime = System.currentTimeMillis(); + long timeoutTime = startTime + getConnectTimeout(); // wait until interpreter send started message through thrift rpc synchronized (started) { - if (!started.get()) { + while (!started.get()) { + long timetoTimeout = timeoutTime - System.currentTimeMillis(); + if (timetoTimeout <= 0) { + stop(); + throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now"); + } try { - started.wait(getConnectTimeout()); + started.wait(timetoTimeout); } catch (InterruptedException e) { - LOGGER.error("Remote interpreter is not accessible"); + LOGGER.error("Interrupt received. Try to stop the interpreter and interrupt the current thread.", e); + stop(); + Thread.currentThread().interrupt(); } } } - if (!started.get()) { - LOGGER.info("Interpreter pod creation is time out in {} seconds", getConnectTimeout()/1000); - } - // waits for interpreter thrift rpc server ready - while (System.currentTimeMillis() - startTime < getConnectTimeout()) { - if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { - break; - } else { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } + while (!RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { + if (System.currentTimeMillis() - timeoutTime > 0) { + stop(); + throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now"); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOGGER.error("Interrupt received. Try to stop the interpreter and interrupt the current thread.", e); + stop(); + Thread.currentThread().interrupt(); } } } @Override public void stop() { + Properties templateProperties = getTemplateBindings(null); // delete pod try { - apply(specTempaltes, true); + apply(specTempaltes, true, templateProperties); } catch (IOException e) { LOGGER.info("Error on removing interpreter pod", e); } - - try { - kubectl.wait(String.format("pod/%s", getPodName()), "delete", 60); - } catch (IOException e) { - LOGGER.debug("Error on waiting pod delete", e); - } - - - if (portForwardWatchdog != null) { - portForwardWatchdog.destroyProcess(); + if (portForward) { + try { + localPortForward.close(); + } catch (IOException e) { + LOGGER.info("Error on closing portforwarder", e); + } } + // Shutdown connection + shutdown(); } @Override @@ -194,40 +208,24 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { return true; } - - String ret = kubectl.execAndGet(new String[]{ - "get", - String.format("pods/%s", getPodName()), - "-o", - "json" - }); - - if (ret == null) { - return false; - } - - Map<String, Object> pod = gson.fromJson(ret, new TypeToken<Map<String, Object>>() {}.getType()); - if (pod == null || !pod.containsKey("status")) { - return false; - } - - Map<String, Object> status = (Map<String, Object>) pod.get("status"); - if (status == null || !status.containsKey("phase")) { - return false; + Pod pod = client.pods().inNamespace(namespace).withName(podName).get(); + if (pod != null) { + PodStatus status = pod.getStatus(); + if (status != null) { + return "Running".equals(status.getPhase()) && started.get(); + } } - - return "Running".equals(status.get("phase")) && started.get(); } catch (Exception e) { LOGGER.error("Can't get pod status", e); - return false; } + return false; } /** * Apply spec file(s) in the path. * @param path */ - void apply(File path, boolean delete) throws IOException { + void apply(File path, boolean delete, Properties templateProperties) throws IOException { if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) { LOGGER.info("Skip {}", path.getAbsolutePath()); } @@ -240,18 +238,19 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { } for (File f : files) { - apply(f, delete); + apply(f, delete, templateProperties); } } else if (path.isFile()) { - LOGGER.info("Apply {}", path.getAbsolutePath()); K8sSpecTemplate specTemplate = new K8sSpecTemplate(); - specTemplate.loadProperties(getTemplateBindings()); - - String spec = specTemplate.render(path); + specTemplate.loadProperties(templateProperties); + String template = specTemplate.render(path); + ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean> k8sObjects = client.load(IOUtils.toInputStream(template, StandardCharsets.UTF_8)); + LOGGER.info("Apply {} with {} K8s Objects", path.getAbsolutePath(), k8sObjects.get().size()); + LOGGER.debug(template); if (delete) { - kubectl.delete(spec); + k8sObjects.inNamespace(namespace).delete(); } else { - kubectl.apply(spec); + k8sObjects.inNamespace(namespace).createOrReplace(); } } else { LOGGER.error("Can't apply {}", path.getAbsolutePath()); @@ -259,11 +258,11 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { } @VisibleForTesting - Properties getTemplateBindings() { + Properties getTemplateBindings(String userName) { Properties k8sProperties = new Properties(); // k8s template properties - k8sProperties.put("zeppelin.k8s.namespace", kubectl.getNamespace()); + k8sProperties.put("zeppelin.k8s.namespace", getNamespace()); k8sProperties.put("zeppelin.k8s.interpreter.pod.name", getPodName()); k8sProperties.put("zeppelin.k8s.interpreter.container.name", interpreterGroupName.toLowerCase()); k8sProperties.put("zeppelin.k8s.interpreter.container.image", containerImage); @@ -287,7 +286,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { int webUiPort = 4040; k8sProperties.put("zeppelin.k8s.spark.container.image", sparkImage); if (isSparkOnKubernetes(properties)) { - envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions()); + envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions(userName)); } envs.put("SPARK_HOME", envs.getOrDefault("SPARK_HOME", "/spark")); @@ -355,7 +354,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { } @VisibleForTesting - String buildSparkSubmitOptions() { + String buildSparkSubmitOptions(String userName) { StringBuilder options = new StringBuilder(); options.append(" --master k8s://https://kubernetes.default.svc"); @@ -363,10 +362,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { if (properties.containsKey(SPARK_DRIVER_MEMROY)) { options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMROY)); } - if (userName != null) { + if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) { options.append(" --proxy-user " + userName); } - options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace()); + options.append(" --conf spark.kubernetes.namespace=" + getNamespace()); options.append(" --conf spark.executor.instances=1"); options.append(" --conf spark.kubernetes.driver.pod.name=" + getPodName()); options.append(" --conf spark.kubernetes.container.image=" + sparkImage); @@ -381,7 +380,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { private String getInterpreterPodDnsName() { return String.format("%s.%s.svc", getPodName(), // service name and pod name is the same - kubectl.getNamespace()); + getNamespace()); } /** @@ -431,7 +430,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess { LOGGER.info("Interpreter pod created {}:{}", host, port); synchronized (started) { started.set(true); - started.notify(); + started.notifyAll(); } } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java index 2ed2c13..5abfb48 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java @@ -16,9 +16,6 @@ */ package org.apache.zeppelin.interpreter.launcher; -import com.hubspot.jinjava.Jinjava; -import org.apache.commons.io.FileUtils; - import java.io.File; import java.io.IOException; import java.nio.charset.Charset; @@ -27,6 +24,11 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.commons.io.FileUtils; + +import com.hubspot.jinjava.Jinjava; +import com.hubspot.jinjava.JinjavaConfig; + public class K8sSpecTemplate extends HashMap<String, Object> { public String render(File templateFile) throws IOException { String template = FileUtils.readFileToString(templateFile, Charset.defaultCharset()); @@ -37,7 +39,8 @@ public class K8sSpecTemplate extends HashMap<String, Object> { ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); - Jinjava jinja = new Jinjava(); + JinjavaConfig config = JinjavaConfig.newBuilder().withLstripBlocks(true).withTrimBlocks(true).build(); + Jinjava jinja = new Jinjava(config); return jinja.render(template, this); } finally { Thread.currentThread().setContextClassLoader(oldCl); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java index d1df0f3..799a91c 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java @@ -18,22 +18,24 @@ package org.apache.zeppelin.interpreter.launcher; import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; -import com.google.common.annotations.VisibleForTesting; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; /** * Interpreter Launcher which use shell script to launch the interpreter process. @@ -41,23 +43,14 @@ import java.util.Map; public class K8sStandardInterpreterLauncher extends InterpreterLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class); - private final Kubectl kubectl; private InterpreterLaunchContext context; - + private final KubernetesClient client; public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException { super(zConf, recoveryStorage); - kubectl = new Kubectl(zConf.getK8sKubectlCmd()); - kubectl.setNamespace(getNamespace()); + client = new DefaultKubernetesClient(); } - @VisibleForTesting - K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage, Kubectl kubectl) { - super(zConf, recoveryStorage); - this.kubectl = kubectl; - } - - /** * Check if i'm running inside of kubernetes or not. * It should return truth regardless of ZeppelinConfiguration.getRunMode(). @@ -79,9 +72,9 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { */ String getNamespace() throws IOException { if (isRunningOnKubernetes()) { - return readFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace", Charset.defaultCharset()).trim(); + return readFile(Config.KUBERNETES_NAMESPACE_PATH, Charset.defaultCharset()).trim(); } else { - return "default"; + return zConf.getK8sNamepsace(); } } @@ -143,10 +136,10 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup()); this.context = context; this.properties = context.getProperties(); - int connectTimeout = getConnectTimeout(); return new K8sRemoteInterpreterProcess( - kubectl, + client, + getNamespace(), new File(zConf.getK8sTemplatesDir(), "interpreter"), zConf.getK8sContainerImage(), context.getInterpreterGroupId(), @@ -158,7 +151,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { getZeppelinServiceRpcPort(), zConf.getK8sPortForward(), zConf.getK8sSparkContainerImage(), - connectTimeout, + getConnectTimeout(), isUserImpersonateForSparkInterpreter(context)); } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java deleted file mode 100644 index a0ceb0a..0000000 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.launcher; - -import com.google.common.annotations.VisibleForTesting; -import java.util.ArrayList; -import java.util.Arrays; - -import org.apache.commons.exec.*; -import org.apache.commons.io.IOUtils; - -import java.io.*; -import java.nio.charset.StandardCharsets; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Kubectl { - private static final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class); - private final String kubectlCmd; - private String namespace; - - public Kubectl(String kubectlCmd) { - this.kubectlCmd = kubectlCmd; - } - - /** - * Override namespace. Otherwise use namespace provided in schema - * @param namespace - */ - public void setNamespace(String namespace) { - this.namespace = namespace; - } - - public String getNamespace() { - return namespace; - } - - public String apply(String spec) throws IOException { - return execAndGet(new String[]{"apply", "-f", "-"}, spec); - } - - public String delete(String spec) throws IOException { - return execAndGet(new String[]{"delete", "-f", "-"}, spec); - } - - public String wait(String resource, String waitFor, int timeoutSec) throws IOException { - try { - return execAndGet(new String[]{ - "wait", - resource, - String.format("--for=%s", waitFor), - String.format("--timeout=%ds", timeoutSec)}); - } catch (IOException e) { - if ("delete".equals(waitFor) && e.getMessage().contains("NotFound")) { - LOGGER.info("{} Not found. Maybe already deleted.", resource); - return ""; - } else { - throw e; - } - } - } - - public ExecuteWatchdog portForward(String resource, String [] ports) throws IOException { - DefaultExecutor executor = new DefaultExecutor(); - CommandLine cmd = new CommandLine(kubectlCmd); - cmd.addArguments("port-forward"); - cmd.addArguments(resource); - cmd.addArguments(ports); - - ExecuteWatchdog watchdog = new ExecuteWatchdog(-1); - executor.setWatchdog(watchdog); - - executor.execute(cmd, new ExecuteResultHandler() { - @Override - public void onProcessComplete(int i) { - LOGGER.info("Port-forward stopped"); - } - - @Override - public void onProcessFailed(ExecuteException e) { - LOGGER.debug("port-forward process exit", e); - } - }); - - return watchdog; - } - - String execAndGet(String [] args) throws IOException { - return execAndGet(args, ""); - } - - @VisibleForTesting - String execAndGet(String [] args, String stdin) throws IOException { - InputStream ins = IOUtils.toInputStream(stdin, StandardCharsets.UTF_8); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - ArrayList<String> argsToOverride = new ArrayList<>(Arrays.asList(args)); - - // set namespace - if (namespace != null) { - argsToOverride.add("--namespace=" + namespace); - } - - LOGGER.info("kubectl {}", argsToOverride); - LOGGER.debug(stdin); - - try { - int exitCode = execute( - argsToOverride.toArray(new String[0]), - ins, - stdout, - stderr - ); - - if (exitCode == 0) { - return new String(stdout.toByteArray()); - } else { - String output = new String(stderr.toByteArray()); - throw new IOException(String.format("non zero return code (%d). %s", exitCode, output)); - } - } catch (Exception e) { - String output = new String(stderr.toByteArray()); - throw new IOException(output, e); - } - } - - public int execute(String [] args, InputStream stdin, OutputStream stdout, OutputStream stderr) throws IOException { - DefaultExecutor executor = new DefaultExecutor(); - CommandLine cmd = new CommandLine(kubectlCmd); - cmd.addArguments(args); - - ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000L); - executor.setWatchdog(watchdog); - - PumpStreamHandler streamHandler = new PumpStreamHandler(stdout, stderr, stdin); - executor.setStreamHandler(streamHandler); - return executor.execute(cmd); - } -} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java index 26467a0..ad4d5fd 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -17,32 +17,34 @@ package org.apache.zeppelin.interpreter.launcher; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Properties; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.junit.Rule; +import org.junit.Test; + +import io.fabric8.kubernetes.client.server.mock.KubernetesServer; public class K8sRemoteInterpreterProcessTest { + @Rule + public KubernetesServer server = new KubernetesServer(true, true); + @Test public void testGetHostPort() { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - Properties properties = new Properties(); HashMap<String, String> envs = new HashMap<String, String>(); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( - kubectl, + server.getClient(), + "default", new File(".skip"), "interpreter-container:1.0", "shared_process", @@ -58,21 +60,19 @@ public class K8sRemoteInterpreterProcessTest { false); // then - assertEquals(String.format("%s.%s.svc", intp.getPodName(), kubectl.getNamespace()), intp.getHost()); + assertEquals(String.format("%s.%s.svc", intp.getPodName(), "default"), intp.getHost()); assertEquals(12321, intp.getPort()); } @Test public void testPredefinedPortNumbers() { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - Properties properties = new Properties(); HashMap<String, String> envs = new HashMap<String, String>(); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( - kubectl, + server.getClient(), + "default", new File(".skip"), "interpreter-container:1.0", "shared_process", @@ -98,16 +98,14 @@ public class K8sRemoteInterpreterProcessTest { @Test public void testGetTemplateBindings() throws IOException { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - Properties properties = new Properties(); properties.put("my.key1", "v1"); HashMap<String, String> envs = new HashMap<String, String>(); envs.put("MY_ENV1", "V1"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( - kubectl, + server.getClient(), + "default", new File(".skip"), "interpreter-container:1.0", "shared_process", @@ -123,7 +121,7 @@ public class K8sRemoteInterpreterProcessTest { false); // when - Properties p = intp.getTemplateBindings(); + Properties p = intp.getTemplateBindings(null); // then assertEquals("default", p.get("zeppelin.k8s.namespace")); @@ -148,9 +146,6 @@ public class K8sRemoteInterpreterProcessTest { @Test public void testGetTemplateBindingsForSpark() throws IOException { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - Properties properties = new Properties(); properties.put("my.key1", "v1"); properties.put("spark.master", "k8s://http://api"); @@ -160,7 +155,8 @@ public class K8sRemoteInterpreterProcessTest { envs.put("SERVICE_DOMAIN", "mydomain"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( - kubectl, + server.getClient(), + "default", new File(".skip"), "interpreter-container:1.0", "shared_process", @@ -176,8 +172,7 @@ public class K8sRemoteInterpreterProcessTest { false); // when - intp.start("mytestUser"); - Properties p = intp.getTemplateBindings(); + Properties p = intp.getTemplateBindings("mytestUser"); // then assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image")); @@ -188,7 +183,7 @@ public class K8sRemoteInterpreterProcessTest { String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS"); assertTrue(sparkSubmitOptions.startsWith("my options ")); - assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace())); + assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default")); assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName())); assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0")); assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost())); @@ -201,9 +196,6 @@ public class K8sRemoteInterpreterProcessTest { @Test public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - Properties properties = new Properties(); properties.put("my.key1", "v1"); properties.put("spark.master", "k8s://http://api"); @@ -213,7 +205,8 @@ public class K8sRemoteInterpreterProcessTest { envs.put("SERVICE_DOMAIN", "mydomain"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( - kubectl, + server.getClient(), + "default", new File(".skip"), "interpreter-container:1.0", "shared_process", @@ -229,8 +222,7 @@ public class K8sRemoteInterpreterProcessTest { true); // when - intp.start("mytestUser"); - Properties p = intp.getTemplateBindings(); + Properties p = intp.getTemplateBindings("mytestUser"); // then assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image")); assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl")); @@ -240,7 +232,7 @@ public class K8sRemoteInterpreterProcessTest { String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS"); assertTrue(sparkSubmitOptions.startsWith("my options ")); - assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace())); + assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default")); assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName())); assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0")); assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost())); @@ -253,9 +245,6 @@ public class K8sRemoteInterpreterProcessTest { @Test public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOException { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - Properties properties = new Properties(); properties.put("my.key1", "v1"); properties.put("spark.master", "k8s://http://api"); @@ -265,7 +254,8 @@ public class K8sRemoteInterpreterProcessTest { envs.put("SERVICE_DOMAIN", "mydomain"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( - kubectl, + server.getClient(), + "default", new File(".skip"), "interpreter-container:1.0", "shared_process", @@ -281,8 +271,7 @@ public class K8sRemoteInterpreterProcessTest { true); // when - intp.start("anonymous"); - Properties p = intp.getTemplateBindings(); + Properties p = intp.getTemplateBindings("anonymous"); // then assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image")); assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl")); @@ -298,15 +287,13 @@ public class K8sRemoteInterpreterProcessTest { @Test public void testSparkUiWebUrlTemplate() { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - Properties properties = new Properties(); HashMap<String, String> envs = new HashMap<String, String>(); envs.put("SERVICE_DOMAIN", "mydomain"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( - kubectl, + server.getClient(), + "default", new File(".skip"), "interpreter-container:1.0", "shared_process", @@ -341,9 +328,6 @@ public class K8sRemoteInterpreterProcessTest { @Test public void testSparkPodResources() { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - Properties properties = new Properties(); properties.put("spark.driver.memory", "1g"); properties.put("spark.driver.cores", "1"); @@ -351,7 +335,8 @@ public class K8sRemoteInterpreterProcessTest { envs.put("SERVICE_DOMAIN", "mydomain"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( - kubectl, + server.getClient(), + "default", new File(".skip"), "interpreter-container:1.0", "shared_process", @@ -367,7 +352,7 @@ public class K8sRemoteInterpreterProcessTest { false); // when - Properties p = intp.getTemplateBindings(); + Properties p = intp.getTemplateBindings(null); // then assertEquals("1", p.get("zeppelin.k8s.interpreter.cores")); @@ -377,9 +362,6 @@ public class K8sRemoteInterpreterProcessTest { @Test public void testSparkPodResourcesMemoryOverhead() { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - Properties properties = new Properties(); properties.put("spark.driver.memory", "1g"); properties.put("spark.driver.memoryOverhead", "256m"); @@ -388,7 +370,8 @@ public class K8sRemoteInterpreterProcessTest { envs.put("SERVICE_DOMAIN", "mydomain"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( - kubectl, + server.getClient(), + "default", new File(".skip"), "interpreter-container:1.0", "shared_process", @@ -404,7 +387,7 @@ public class K8sRemoteInterpreterProcessTest { false); // when - Properties p = intp.getTemplateBindings(); + Properties p = intp.getTemplateBindings(null); // then assertEquals("5", p.get("zeppelin.k8s.interpreter.cores")); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java index daf3773..f859cab 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java @@ -51,6 +51,21 @@ public class K8sSpecTemplateTest { } @Test + public void testRenderWithStrip() { + // given + K8sSpecTemplate template = new K8sSpecTemplate(); + template.put("test", "test"); + // when + String spec = template.render( + " {% if test == \"test\" %}\n" + + " After commit\n" + + " {% endif %}\n"); + + // then + assertEquals(" After commit\n", spec); + } + + @Test public void testIterate() { // given K8sSpecTemplate template = new K8sSpecTemplate(); diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java index c580a43..ff47dd0 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java @@ -17,18 +17,16 @@ package org.apache.zeppelin.interpreter.launcher; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Properties; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.junit.Before; +import org.junit.Test; /** * In the future, test may use minikube on travis for end-to-end test @@ -46,11 +44,8 @@ public class K8sStandardInterpreterLauncherTest { @Test public void testK8sLauncher() throws IOException { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl); + K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null); Properties properties = new Properties(); properties.setProperty("ENV_1", "VALUE_1"); properties.setProperty("property_1", "value_1"); @@ -78,11 +73,8 @@ public class K8sStandardInterpreterLauncherTest { @Test public void testK8sLauncherWithSparkAndUserImpersonate() throws IOException { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl); + K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null); Properties properties = new Properties(); properties.setProperty("ENV_1", "VALUE_1"); properties.setProperty("property_1", "value_1"); @@ -108,22 +100,14 @@ public class K8sStandardInterpreterLauncherTest { assertTrue(client instanceof K8sRemoteInterpreterProcess); K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client; assertTrue(process.isSpark()); - - // when - process.start(context.getUserName()); - - // then - assertTrue(process.buildSparkSubmitOptions().contains("--proxy-user user1")); + assertTrue(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1")); } @Test public void testK8sLauncherWithSparkAndWithoutUserImpersonate() throws IOException { // given - Kubectl kubectl = mock(Kubectl.class); - when(kubectl.getNamespace()).thenReturn("default"); - ZeppelinConfiguration zConf = new ZeppelinConfiguration(); - K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl); + K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null); Properties properties = new Properties(); properties.setProperty("ENV_1", "VALUE_1"); properties.setProperty("property_1", "value_1"); @@ -149,11 +133,6 @@ public class K8sStandardInterpreterLauncherTest { assertTrue(client instanceof K8sRemoteInterpreterProcess); K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client; assertTrue(process.isSpark()); - - // when - process.start(context.getUserName()); - - // then - assertFalse(process.buildSparkSubmitOptions().contains("--proxy-user user1")); + assertFalse(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1")); } } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java deleted file mode 100644 index 072cf94..0000000 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.launcher; - -import org.apache.commons.io.IOUtils; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class KubectlTest { - - @Test(expected = IOException.class) - public void testKubeclCommandNotExists() throws IOException { - // given - Kubectl kubectl = new Kubectl("invalidcommand"); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - - // when - kubectl.execute(new String[] {}, null, stdout, stderr); - - // then throw IOException - } - - @Test - public void testStdout() throws IOException { - // given - Kubectl kubectl = new Kubectl("echo"); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - - // when - kubectl.execute(new String[] {"hello"}, null, stdout, stderr); - - // then - assertEquals("hello\n", stdout.toString()); - assertEquals("", stderr.toString()); - } - - @Test - public void testStderr() throws IOException { - // given - Kubectl kubectl = new Kubectl("sh"); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - - // when - try { - kubectl.execute(new String[]{"-c", "yoyo"}, null, stdout, stderr); - } catch (IOException e) { - } - - // then - assertEquals("", stdout.toString()); - assertTrue(0 < stderr.toString().length()); - } - - @Test - public void testStdin() throws IOException { - // given - Kubectl kubectl = new Kubectl("wc"); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - InputStream stdin = IOUtils.toInputStream("Hello"); - - // when - kubectl.execute(new String[]{"-c"}, stdin, stdout, stderr); - - // then - assertEquals("5", stdout.toString().trim()); - assertEquals("", stderr.toString()); - } - - @Test - public void testExecSpecAndGet() throws IOException { - // given - Kubectl kubectl = new Kubectl("cat"); - String spec = "{'k1': 'v1', 'k2': 2}"; - - // when - String result = kubectl.execAndGet(new String[]{}, spec); - - // then - assertEquals(spec, result); - } -}