This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new af8280685b [ZEPPELIN-5855] Refactor docker plugin and remove powermock (#4519) af8280685b is described below commit af8280685b4dc916521f4a279c64eb455ef52b4d Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Wed Feb 1 11:16:17 2023 +0100 [ZEPPELIN-5855] Refactor docker plugin and remove powermock (#4519) * Refactor docker plugin * prevent infinite loop --- docs/quickstart/docker.md | 2 +- .../zeppelin/conf/ZeppelinConfiguration.java | 7 + zeppelin-plugins/launcher/docker/pom.xml | 8 -- .../launcher/DockerInterpreterProcess.java | 148 ++++++++++----------- .../interpreter/launcher/utils/TarUtils.java | 47 ++++--- .../launcher/DockerInterpreterProcessTest.java | 109 +++++++-------- 6 files changed, 157 insertions(+), 164 deletions(-) diff --git a/docs/quickstart/docker.md b/docs/quickstart/docker.md index 45e6bee669..187e9f9c83 100644 --- a/docs/quickstart/docker.md +++ b/docs/quickstart/docker.md @@ -88,7 +88,7 @@ access to this port. Set to the same time zone as the zeppelin server, keeping the time zone in the interpreter docker container the same as the server. E.g, `"America/New_York"` or `"Asia/Shanghai"` ```bash - export DOCKER_TIME_ZONE="America/New_York" + export ZEPPELIN_DOCKER_TIME_ZONE="America/New_York" ``` diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index f251c3182a..efeeea3552 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TimeZone; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -1086,8 +1087,14 @@ public class ZeppelinConfiguration { ZEPPELIN_K8S_SERVICE_NAME("zeppelin.k8s.service.name", "zeppelin-server"), ZEPPELIN_K8S_TIMEOUT_DURING_PENDING("zeppelin.k8s.timeout.during.pending", true), + // Used by K8s and Docker plugin ZEPPELIN_DOCKER_CONTAINER_IMAGE("zeppelin.docker.container.image", "apache/zeppelin:" + Util.getVersion()), + ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME("zeppelin.docker.container.spark.home", "/spark"), + ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER("zeppelin.docker.upload.local.lib.to.container", true), + ZEPPELIN_DOCKER_HOST("zeppelin.docker.host", "http://0.0.0.0:2375"), + ZEPPELIN_DOCKER_TIME_ZONE("zeppelin.docker.time.zone", TimeZone.getDefault().getID()), + ZEPPELIN_METRIC_ENABLE_PROMETHEUS("zeppelin.metric.enable.prometheus", false), ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER("zeppelin.impersonate.spark.proxy.user", true), diff --git a/zeppelin-plugins/launcher/docker/pom.xml b/zeppelin-plugins/launcher/docker/pom.xml index 27e6b0bb6c..911d00a34b 100644 --- a/zeppelin-plugins/launcher/docker/pom.xml +++ b/zeppelin-plugins/launcher/docker/pom.xml @@ -61,14 +61,6 @@ <artifactId>commons-compress</artifactId> <version>${commons.compress.version}</version> </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-api-mockito2</artifactId> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4</artifactId> - </dependency> </dependencies> <build> diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java index 551e36bd7b..228eee5e7a 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java @@ -24,12 +24,11 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.SocketException; import java.net.URI; import java.net.URL; -import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -37,7 +36,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.TimeZone; import java.util.concurrent.atomic.AtomicBoolean; import com.spotify.docker.client.DefaultDockerClient; @@ -56,6 +54,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.launcher.utils.TarFileEntry; import org.apache.zeppelin.interpreter.launcher.utils.TarUtils; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; @@ -67,7 +66,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB; public class DockerInterpreterProcess extends RemoteInterpreterProcess { - private static final Logger LOGGER = LoggerFactory.getLogger(DockerInterpreterLauncher.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DockerInterpreterProcess.class); private String dockerIntpServicePort = "0"; @@ -99,13 +98,12 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { private String zeppelinHome; @VisibleForTesting - final String CONTAINER_SPARK_HOME; + final String containerSparkHome; @VisibleForTesting - final String DOCKER_HOST; + final String dockerHost; - private String containerId; - final String CONTAINER_UPLOAD_TAR_DIR = "/tmp/zeppelin-tar"; + private static final String CONTAINER_UPLOAD_TAR_DIR = "/tmp/zeppelin-tar"; public DockerInterpreterProcess( ZeppelinConfiguration zconf, @@ -127,27 +125,20 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { this.interpreterGroupName = interpreterGroupName; this.interpreterSettingName = interpreterSettingName; this.properties = properties; - this.envs = new HashMap(envs); + this.envs = new HashMap<>(envs); this.zconf = zconf; this.containerName = interpreterGroupId.toLowerCase(); - String sparkHome = System.getenv("CONTAINER_SPARK_HOME"); - CONTAINER_SPARK_HOME = (sparkHome == null) ? "/spark" : sparkHome; - - String uploadLocalLib = System.getenv("UPLOAD_LOCAL_LIB_TO_CONTAINTER"); - if (null != uploadLocalLib && StringUtils.equals(uploadLocalLib, "false")) { - uploadLocalLibToContainter = false; - } + containerSparkHome = zconf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME); + uploadLocalLibToContainter = zconf.getBoolean(ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER); try { this.zeppelinHome = getZeppelinHome(); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } - String defDockerHost = "http://0.0.0.0:2375"; - String dockerHost = System.getenv("DOCKER_HOST"); - DOCKER_HOST = (dockerHost == null) ? defDockerHost : dockerHost; + dockerHost = zconf.getString(ConfVars.ZEPPELIN_DOCKER_HOST); } @Override @@ -162,7 +153,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { @Override public void start(String userName) throws IOException { - docker = DefaultDockerClient.builder().uri(URI.create(DOCKER_HOST)).build(); + docker = DefaultDockerClient.builder().uri(URI.create(dockerHost)).build(); removeExistContainer(containerName); @@ -229,7 +220,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { final ContainerCreation containerCreation = docker.createContainer(containerConfig, containerName); - this.containerId = containerCreation.id(); + String containerId = containerCreation.id(); // Start container docker.startContainer(containerId); @@ -238,32 +229,37 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { execInContainer(containerId, dockerCommand, false); } catch (DockerException e) { - LOGGER.error(e.getMessage(), e); - throw new IOException(e.getMessage()); + throw new IOException(e); } catch (InterruptedException e) { - LOGGER.error(e.getMessage(), e); - throw new IOException(e.getMessage()); + // Restore interrupted state... + Thread.currentThread().interrupt(); + throw new IOException("Docker preparations were interrupted.", e); } long startTime = System.currentTimeMillis(); - + long timeoutTime = startTime + getConnectTimeout(); // wait until interpreter send dockerStarted message through thrift rpc synchronized (dockerStarted) { - if (!dockerStarted.get()) { + while (!dockerStarted.get() && !Thread.currentThread().isInterrupted()) { + long timeToTimeout = timeoutTime - System.currentTimeMillis(); + if (timeToTimeout <= 0) { + LOGGER.info("Interpreter docker creation is time out in {} seconds", + getConnectTimeout() / 1000); + stop(); + throw new IOException( + "Launching zeppelin interpreter on docker is time out, kill it now"); + } try { - dockerStarted.wait(getConnectTimeout()); + dockerStarted.wait(timeToTimeout); } catch (InterruptedException e) { - LOGGER.error("Remote interpreter is not accessible"); - throw new IOException(e.getMessage()); + // Restore interrupted state... + Thread.currentThread().interrupt(); + stop(); + throw new IOException("Remote interpreter is not accessible", e); } } } - if (!dockerStarted.get()) { - LOGGER.info("Interpreter docker creation is time out in {} seconds", - getConnectTimeout() / 1000); - } - // waits for interpreter thrift rpc server ready while (System.currentTimeMillis() - startTime < getConnectTimeout()) { if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) { @@ -273,6 +269,8 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { Thread.sleep(1000); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); + // Restore interrupted state... + Thread.currentThread().interrupt(); } } } @@ -285,12 +283,12 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { LOGGER.info("Interpreter container created {}:{}", containerHost, containerPort); synchronized (dockerStarted) { dockerStarted.set(true); - dockerStarted.notify(); + dockerStarted.notifyAll(); } } @VisibleForTesting - Properties getTemplateBindings() throws IOException { + Properties getTemplateBindings() { Properties dockerProperties = new Properties(); // docker template properties @@ -312,19 +310,15 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { } @VisibleForTesting - List<String> getListEnvs() throws SocketException, UnknownHostException { + List<String> getListEnvs() { // environment variables envs.put("ZEPPELIN_HOME", zeppelinHome); envs.put("ZEPPELIN_CONF_DIR", zeppelinHome + "/conf"); envs.put("ZEPPELIN_FORCE_STOP", "true"); - envs.put("SPARK_HOME", this.CONTAINER_SPARK_HOME); + envs.put("SPARK_HOME", this.containerSparkHome); // set container time zone - String dockerTimeZone = System.getenv("DOCKER_TIME_ZONE"); - if (StringUtils.isBlank(dockerTimeZone)) { - dockerTimeZone = TimeZone.getDefault().getID(); - } - envs.put("TZ", dockerTimeZone); + envs.put("TZ", zconf.getString(ConfVars.ZEPPELIN_DOCKER_TIME_ZONE)); List<String> listEnv = new ArrayList<>(); for (Map.Entry<String, String> entry : this.envs.entrySet()) { @@ -344,7 +338,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { return null; }); } catch (Exception e) { - LOGGER.warn("ignore the exception when shutting down", e); + LOGGER.warn("Ignore the exception when shutting down", e); } } try { @@ -353,7 +347,11 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { // Remove container docker.removeContainer(containerName); - } catch (DockerException | InterruptedException e) { + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + // Restore interrupted state... + Thread.currentThread().interrupt(); + } catch (DockerException e) { LOGGER.error(e.getMessage(), e); } @@ -379,18 +377,26 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { } } - if (isExist == true) { + if (isExist) { LOGGER.info("kill exist container {}", containerName); docker.killContainer(containerName); } - } catch (DockerException | InterruptedException e) { + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + // Restore interrupted state... + Thread.currentThread().interrupt(); + } catch (DockerException e) { LOGGER.error(e.getMessage(), e); } finally { try { - if (isExist == true) { + if (isExist) { docker.removeContainer(containerName); } - } catch (DockerException | InterruptedException e) { + } catch (InterruptedException e) { + LOGGER.error(e.getMessage(), e); + // Restore interrupted state... + Thread.currentThread().interrupt(); + } catch (DockerException e) { LOGGER.error(e.getMessage(), e); } } @@ -406,6 +412,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { return containerPort; } + @Override public boolean isAlive() { //TODO(ZEPPELIN-5876): Implement it more accurately return isRunning(); @@ -479,15 +486,14 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { // 3.5) jdbc interpreter properties keytab file intpKeytab = properties.getProperty("zeppelin.jdbc.keytab.location", ""); } - if (!StringUtils.isBlank(intpKeytab) && !copyFiles.containsKey(intpKeytab)) { + if (!StringUtils.isBlank(intpKeytab)) { LOGGER.info("intpKeytab : {}", intpKeytab); - copyFiles.put(intpKeytab, intpKeytab); + copyFiles.putIfAbsent(intpKeytab, intpKeytab); } // 3.6) zeppelin server keytab file String zeppelinServerKeytab = zconf.getString(ZEPPELIN_SERVER_KERBEROS_KEYTAB); - if (!StringUtils.isBlank(zeppelinServerKeytab) - && !copyFiles.containsKey(zeppelinServerKeytab)) { - copyFiles.put(zeppelinServerKeytab, zeppelinServerKeytab); + if (!StringUtils.isBlank(zeppelinServerKeytab)) { + copyFiles.putIfAbsent(zeppelinServerKeytab, zeppelinServerKeytab); } // 4) hadoop conf dir @@ -499,10 +505,10 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { // 5) spark conf dir if (envs.containsKey("SPARK_CONF_DIR")) { String sparkConfDir = envs.get("SPARK_CONF_DIR"); - rmInContainer(containerId, CONTAINER_SPARK_HOME + "/conf"); - mkdirInContainer(containerId, CONTAINER_SPARK_HOME + "/conf"); - copyFiles.put(sparkConfDir, CONTAINER_SPARK_HOME + "/conf"); - envs.put("SPARK_CONF_DIR", CONTAINER_SPARK_HOME + "/conf"); + rmInContainer(containerId, containerSparkHome + "/conf"); + mkdirInContainer(containerId, containerSparkHome + "/conf"); + copyFiles.put(sparkConfDir, containerSparkHome + "/conf"); + envs.put("SPARK_CONF_DIR", containerSparkHome + "/conf"); } if (uploadLocalLibToContainter){ @@ -528,9 +534,8 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { FileFilterUtils.suffixFileFilter("jar"), null); for (File jarfile : listFiles) { String jarfilePath = jarfile.getAbsolutePath(); - if (!StringUtils.isBlank(jarfilePath) - && !copyFiles.containsKey(jarfilePath)) { - copyFiles.put(jarfilePath, jarfilePath); + if (!StringUtils.isBlank(jarfilePath)) { + copyFiles.putIfAbsent(jarfilePath, jarfilePath); } } } @@ -547,19 +552,15 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { String tarFile = file2Tar(copyFiles); // copy tar to ZEPPELIN_CONTAINER_DIR, auto unzip - InputStream inputStream = new FileInputStream(tarFile); - try { + try (InputStream inputStream = new FileInputStream(tarFile)) { docker.copyToContainer(inputStream, containerId, CONTAINER_UPLOAD_TAR_DIR); - } finally { - inputStream.close(); } // copy all files in CONTAINER_UPLOAD_TAR_DIR to the root directory cpdirInContainer(containerId, CONTAINER_UPLOAD_TAR_DIR + "/*", "/"); // delete tar file in the local - File fileTar = new File(tarFile); - fileTar.delete(); + Files.delete(Paths.get(tarFile)); } private void mkdirInContainer(String containerId, String path) @@ -583,7 +584,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { private void execInContainer(String containerId, String execCommand, boolean logout) throws DockerException, InterruptedException { - LOGGER.info("exec container commmand: " + execCommand); + LOGGER.info("exec container commmand: {}", execCommand); final String[] command = {"sh", "-c", execCommand}; final ExecCreation execCreation = docker.execCreate( @@ -622,15 +623,10 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { } private String getZeppelinHome() throws IOException { - String zeppelinHome = zconf.getZeppelinHome(); - if (System.getenv("ZEPPELIN_HOME") != null) { - zeppelinHome = System.getenv("ZEPPELIN_HOME"); - } - // check zeppelinHome is exist - File fileZeppelinHome = new File(zeppelinHome); + File fileZeppelinHome = new File(zconf.getZeppelinHome()); if (fileZeppelinHome.exists() && fileZeppelinHome.isDirectory()) { - return zeppelinHome; + return zconf.getZeppelinHome(); } throw new IOException("Can't find zeppelin home path!"); diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/utils/TarUtils.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/utils/TarUtils.java index 343f3eed11..6a8b7f92f4 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/utils/TarUtils.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/utils/TarUtils.java @@ -32,39 +32,46 @@ import java.io.IOException; import java.util.List; public class TarUtils { + + private TarUtils() { + throw new IllegalStateException("Utility class"); + } + private static final Logger LOGGER = LoggerFactory.getLogger(TarUtils.class); public static void compress(String name, List<TarFileEntry> files) throws IOException { - try (TarArchiveOutputStream out = getTarArchiveOutputStream(name)){ - for (TarFileEntry tarFileEntry : files){ - addToArchiveCompression(out, tarFileEntry.getFile(), tarFileEntry.getArchivePath()); + try (FileOutputStream fileOutputStream = new FileOutputStream(name)) { + try (TarArchiveOutputStream out = getTarArchiveOutputStream(fileOutputStream)) { + for (TarFileEntry tarFileEntry : files) { + addToArchiveCompression(out, tarFileEntry.getFile(), tarFileEntry.getArchivePath()); + } } } } public static void decompress(String in, File out) throws IOException { - FileInputStream fileInputStream = new FileInputStream(in); - GzipCompressorInputStream gzipInputStream = new GzipCompressorInputStream(fileInputStream); + try (FileInputStream fileInputStream = new FileInputStream(in)) { + GzipCompressorInputStream gzipInputStream = new GzipCompressorInputStream(fileInputStream); - try (TarArchiveInputStream fin = new TarArchiveInputStream(gzipInputStream)){ - TarArchiveEntry entry; - while ((entry = fin.getNextTarEntry()) != null) { - if (entry.isDirectory()) { - continue; - } - File curfile = new File(out, entry.getName()); - File parent = curfile.getParentFile(); - if (!parent.exists()) { - parent.mkdirs(); + try (TarArchiveInputStream fin = new TarArchiveInputStream(gzipInputStream)) { + TarArchiveEntry entry; + while ((entry = fin.getNextTarEntry()) != null) { + if (entry.isDirectory()) { + continue; + } + File curfile = new File(out, entry.getName()); + File parent = curfile.getParentFile(); + if (!parent.exists()) { + parent.mkdirs(); + } + IOUtils.copy(fin, new FileOutputStream(curfile)); } - IOUtils.copy(fin, new FileOutputStream(curfile)); } } } - private static TarArchiveOutputStream getTarArchiveOutputStream(String name) + private static TarArchiveOutputStream getTarArchiveOutputStream(FileOutputStream fileOutputStream) throws IOException { - FileOutputStream fileOutputStream = new FileOutputStream(name); GzipCompressorOutputStream gzipOutputStream = new GzipCompressorOutputStream(fileOutputStream); TarArchiveOutputStream taos = new TarArchiveOutputStream(gzipOutputStream); @@ -82,7 +89,7 @@ public class TarUtils { throws IOException { if (file.isFile()){ String archivePath = "." + dir; - LOGGER.info("archivePath = " + archivePath); + LOGGER.info("archivePath = {}", archivePath); out.putArchiveEntry(new TarArchiveEntry(file, archivePath)); try (FileInputStream in = new FileInputStream(file)) { IOUtils.copy(in, out); @@ -97,7 +104,7 @@ public class TarUtils { } } } else { - LOGGER.error(file.getName() + " is not supported"); + LOGGER.error("{} is not supported", file.getName()); } } } diff --git a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java index 4a7a39de6d..6b33b86afa 100644 --- a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java @@ -17,36 +17,29 @@ package org.apache.zeppelin.interpreter.launcher; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.InterpreterOption; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -@PrepareForTest({System.class, DockerInterpreterProcess.class}) -@PowerMockIgnore( {"javax.management.*"}) -public class DockerInterpreterProcessTest { - private static final Logger LOGGER = LoggerFactory.getLogger(DockerInterpreterProcessTest.class); +class DockerInterpreterProcessTest { - protected static ZeppelinConfiguration zconf = ZeppelinConfiguration.create(); + protected static ZeppelinConfiguration zconf = spy(ZeppelinConfiguration.create()); @Test - public void testCreateIntpProcess() throws IOException { + void testCreateIntpProcess() throws IOException { DockerInterpreterLauncher launcher = new DockerInterpreterLauncher(zconf, null); Properties properties = new Properties(); @@ -62,44 +55,42 @@ public class DockerInterpreterProcessTest { DockerInterpreterProcess interpreterProcess = (DockerInterpreterProcess) client; assertEquals("name", interpreterProcess.getInterpreterSettingName()); - assertEquals(interpreterProcess.CONTAINER_SPARK_HOME, "/spark"); - assertEquals(interpreterProcess.uploadLocalLibToContainter, true); - assertNotEquals(interpreterProcess.DOCKER_HOST, "http://my-docker-host:2375"); + assertEquals("/spark", interpreterProcess.containerSparkHome); + assertTrue(interpreterProcess.uploadLocalLibToContainter); + assertNotEquals("http://my-docker-host:2375", interpreterProcess.dockerHost); } @Test - public void testEnv() throws IOException { - PowerMockito.mockStatic(System.class); - PowerMockito.when(System.getenv("CONTAINER_SPARK_HOME")).thenReturn("my-spark-home"); - PowerMockito.when(System.getenv("UPLOAD_LOCAL_LIB_TO_CONTAINTER")).thenReturn("false"); - PowerMockito.when(System.getenv("DOCKER_HOST")).thenReturn("http://my-docker-host:2375"); + void testEnv() throws IOException { + when(zconf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME)).thenReturn("my-spark-home"); + when(zconf.getBoolean(ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER)).thenReturn(false); + when(zconf.getString(ConfVars.ZEPPELIN_DOCKER_HOST)).thenReturn("http://my-docker-host:2375"); Properties properties = new Properties(); properties.setProperty( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000"); - + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000"); HashMap<String, String> envs = new HashMap<String, String>(); envs.put("MY_ENV1", "V1"); - DockerInterpreterProcess intp = new DockerInterpreterProcess( - zconf, - "interpreter-container:1.0", - "shared_process", - "sh", - "shell", - properties, - envs, - "zeppelin.server.hostname", - 12320, - 5000, 10); - - assertEquals(intp.CONTAINER_SPARK_HOME, "my-spark-home"); - assertEquals(intp.uploadLocalLibToContainter, false); - assertEquals(intp.DOCKER_HOST, "http://my-docker-host:2375"); + DockerInterpreterProcess intp = spy(new DockerInterpreterProcess( + zconf, + "interpreter-container:1.0", + "shared_process", + "sh", + "shell", + properties, + envs, + "zeppelin.server.hostname", + 12320, + 5000, 10)); + + assertEquals("my-spark-home", intp.containerSparkHome); + assertFalse(intp.uploadLocalLibToContainter); + assertEquals("http://my-docker-host:2375", intp.dockerHost); } @Test - public void testTemplateBindings() throws IOException { + void testTemplateBindings() throws IOException { Properties properties = new Properties(); properties.setProperty( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000"); @@ -120,28 +111,28 @@ public class DockerInterpreterProcessTest { 5000, 10); Properties dockerProperties = intp.getTemplateBindings(); - assertEquals(dockerProperties.size(), 10); - - assertTrue(null != dockerProperties.get("CONTAINER_ZEPPELIN_HOME")); - assertTrue(null != dockerProperties.get("zeppelin.interpreter.container.image")); - assertTrue(null != dockerProperties.get("zeppelin.interpreter.group.id")); - assertTrue(null != dockerProperties.get("zeppelin.interpreter.group.name")); - assertTrue(null != dockerProperties.get("zeppelin.interpreter.setting.name")); - assertTrue(null != dockerProperties.get("zeppelin.interpreter.localRepo")); - assertTrue(null != dockerProperties.get("zeppelin.interpreter.rpc.portRange")); - assertTrue(null != dockerProperties.get("zeppelin.server.rpc.host")); - assertTrue(null != dockerProperties.get("zeppelin.server.rpc.portRange")); - assertTrue(null != dockerProperties.get("zeppelin.interpreter.connect.timeout")); + assertEquals(10, dockerProperties.size()); + + assertNotNull(dockerProperties.get("CONTAINER_ZEPPELIN_HOME")); + assertNotNull(dockerProperties.get("zeppelin.interpreter.container.image")); + assertNotNull(dockerProperties.get("zeppelin.interpreter.group.id")); + assertNotNull(dockerProperties.get("zeppelin.interpreter.group.name")); + assertNotNull(dockerProperties.get("zeppelin.interpreter.setting.name")); + assertNotNull(dockerProperties.get("zeppelin.interpreter.localRepo")); + assertNotNull(dockerProperties.get("zeppelin.interpreter.rpc.portRange")); + assertNotNull(dockerProperties.get("zeppelin.server.rpc.host")); + assertNotNull(dockerProperties.get("zeppelin.server.rpc.portRange")); + assertNotNull(dockerProperties.get("zeppelin.interpreter.connect.timeout")); List<String> listEnvs = intp.getListEnvs(); - assertEquals(listEnvs.size(), 6); + assertEquals(6, listEnvs.size()); Map<String, String> mapEnv = new HashMap<>(); for (int i = 0; i < listEnvs.size(); i++) { String env = listEnvs.get(i); String kv[] = env.split("="); mapEnv.put(kv[0], kv[1]); } - assertEquals(mapEnv.size(), 6); + assertEquals(6, mapEnv.size()); assertTrue(mapEnv.containsKey("ZEPPELIN_HOME")); assertTrue(mapEnv.containsKey("ZEPPELIN_CONF_DIR")); assertTrue(mapEnv.containsKey("ZEPPELIN_FORCE_STOP"));