This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 8157fcd [ZEPPELIN-5353] Fix K8s tests 8157fcd is described below commit 8157fcd535ccc82153699daad2b5666aedf0cc5f Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Tue May 4 09:28:43 2021 +0200 [ZEPPELIN-5353] Fix K8s tests ### What is this PR for? This PR updates the K8s Java library and fixes the failing k8s unit tests. ### What type of PR is it? - Hot Fix ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5353 ### How should this be tested? * CI ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #4107 from Reamer/ci_k8s and squashes the following commits: af5943262 [Philipp Dallig] Resolve flapping tests in a shared CI environment 827dbfe60 [Philipp Dallig] Fix for the javax.net.ssl.SSLException that occurs with the disabling of TLS1.0 and TLS1.1 by JDK 8u292. See JDK-8258598 c1e37376a [Philipp Dallig] Update kubernetes.client.version to 5.3.1 a9063765c [Philipp Dallig] Some test cleanup (cherry picked from commit 91bfbf9db1b2ca915644c264694e0add03a69e61) Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> --- zeppelin-plugins/launcher/k8s-standard/pom.xml | 2 +- .../launcher/K8sRemoteInterpreterProcess.java | 2 +- .../interpreter/launcher/PodPhaseWatcher.java | 10 ++++- .../launcher/K8sRemoteInterpreterProcessTest.java | 48 +++++++++++----------- .../interpreter/launcher/PodPhaseWatcherTest.java | 2 +- .../test/resources/k8s-specs/interpreter-spec.yaml | 15 ------- .../src/test/resources/log4j.properties | 31 ++++++++++++++ .../org/apache/zeppelin/notebook/NotebookTest.java | 8 ++-- 8 files changed, 72 insertions(+), 46 deletions(-) diff --git a/zeppelin-plugins/launcher/k8s-standard/pom.xml b/zeppelin-plugins/launcher/k8s-standard/pom.xml index e0d1d2d..34df72e 100644 --- a/zeppelin-plugins/launcher/k8s-standard/pom.xml +++ b/zeppelin-plugins/launcher/k8s-standard/pom.xml @@ -37,7 +37,7 @@ <properties> <plugin.name>Launcher/K8sStandardInterpreterLauncher</plugin.name> - <kubernetes.client.version>4.10.2</kubernetes.client.version> + <kubernetes.client.version>5.3.1</kubernetes.client.version> <jinjava.version>2.5.4</jinjava.version> </properties> diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index e44a5af..80a6f15 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -257,7 +257,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess K8sSpecTemplate specTemplate = new K8sSpecTemplate(); specTemplate.loadProperties(templateProperties); String template = specTemplate.render(path); - ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean> k8sObjects = client.load(IOUtils.toInputStream(template, StandardCharsets.UTF_8)); + ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata> k8sObjects = client.load(IOUtils.toInputStream(template, StandardCharsets.UTF_8)); LOGGER.info("Apply {} with {} K8s Objects", path.getAbsolutePath(), k8sObjects.get().size()); LOGGER.debug(template); if (delete) { diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java index ddf23f5..fe2d1ab 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java @@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; public class PodPhaseWatcher implements Watcher<Pod> { private static final Logger LOGGER = LoggerFactory.getLogger(PodPhaseWatcher.class); @@ -49,7 +49,7 @@ public class PodPhaseWatcher implements Watcher<Pod> { } @Override - public void onClose(KubernetesClientException cause) { + public void onClose(WatcherException cause) { if (cause != null) { LOGGER.error("PodWatcher exits abnormally", cause); } @@ -57,6 +57,12 @@ public class PodPhaseWatcher implements Watcher<Pod> { countDownLatch.countDown(); } + @Override + public void onClose() { + // always count down, so threads that are waiting will continue + countDownLatch.countDown(); + } + public CountDownLatch getCountDownLatch() { return countDownLatch; } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java index 9414411..dbc4555 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.net.URL; import java.time.Instant; import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -45,13 +46,13 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesServer; public class K8sRemoteInterpreterProcessTest { @Rule - public KubernetesServer server = new KubernetesServer(true, true); + public KubernetesServer server = new KubernetesServer(false, true); @Test public void testPredefinedPortNumbers() { // given Properties properties = new Properties(); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( server.getClient(), @@ -81,11 +82,11 @@ public class K8sRemoteInterpreterProcessTest { } @Test - public void testGetTemplateBindings() throws IOException { + public void testGetTemplateBindings() { // given Properties properties = new Properties(); properties.put("my.key1", "v1"); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("MY_ENV1", "V1"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( @@ -118,7 +119,7 @@ public class K8sRemoteInterpreterProcessTest { assertEquals("shared_process", p.get("zeppelin.k8s.interpreter.group.id")); assertEquals("sh", p.get("zeppelin.k8s.interpreter.group.name")); assertEquals("shell", p.get("zeppelin.k8s.interpreter.setting.name")); - assertEquals(true , p.containsKey("zeppelin.k8s.interpreter.localRepo")); + assertTrue(p.containsKey("zeppelin.k8s.interpreter.localRepo")); assertEquals("12321:12321" , p.get("zeppelin.k8s.interpreter.rpc.portRange")); assertEquals("zeppelin.server.service" , p.get("zeppelin.k8s.server.rpc.service")); assertEquals(12320 , p.get("zeppelin.k8s.server.rpc.portRange")); @@ -131,12 +132,12 @@ public class K8sRemoteInterpreterProcessTest { } @Test - public void testGetTemplateBindingsForSpark() throws IOException { + public void testGetTemplateBindingsForSpark() { // given Properties properties = new Properties(); properties.put("my.key1", "v1"); properties.put("spark.master", "k8s://http://api"); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("MY_ENV1", "V1"); envs.put("SPARK_SUBMIT_OPTIONS", "my options"); envs.put("SERVICE_DOMAIN", "mydomain"); @@ -183,12 +184,12 @@ public class K8sRemoteInterpreterProcessTest { } @Test - public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException { + public void testGetTemplateBindingsForSparkWithProxyUser() { // given Properties properties = new Properties(); properties.put("my.key1", "v1"); properties.put("spark.master", "k8s://http://api"); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("MY_ENV1", "V1"); envs.put("SPARK_SUBMIT_OPTIONS", "my options"); envs.put("SERVICE_DOMAIN", "mydomain"); @@ -234,12 +235,12 @@ public class K8sRemoteInterpreterProcessTest { } @Test - public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOException { + public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() { // given Properties properties = new Properties(); properties.put("my.key1", "v1"); properties.put("spark.master", "k8s://http://api"); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("MY_ENV1", "V1"); envs.put("SPARK_SUBMIT_OPTIONS", "my options"); envs.put("SERVICE_DOMAIN", "mydomain"); @@ -281,7 +282,7 @@ public class K8sRemoteInterpreterProcessTest { public void testSparkUiWebUrlTemplate() { // given Properties properties = new Properties(); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("SERVICE_DOMAIN", "mydomain"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( @@ -326,7 +327,7 @@ public class K8sRemoteInterpreterProcessTest { Properties properties = new Properties(); properties.put("spark.driver.memory", "1g"); properties.put("spark.driver.cores", "1"); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("SERVICE_DOMAIN", "mydomain"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( @@ -363,7 +364,7 @@ public class K8sRemoteInterpreterProcessTest { properties.put("spark.driver.memory", "1g"); properties.put("spark.driver.memoryOverhead", "256m"); properties.put("spark.driver.cores", "5"); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("SERVICE_DOMAIN", "mydomain"); K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess( @@ -394,10 +395,10 @@ public class K8sRemoteInterpreterProcessTest { } @Test - public void testK8sStartSuccessful() throws IOException, InterruptedException { + public void testK8sStartSuccessful() throws IOException { // given Properties properties = new Properties(); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("SERVICE_DOMAIN", "mydomain"); URL url = Thread.currentThread().getContextClassLoader() .getResource("k8s-specs/interpreter-spec.yaml"); @@ -430,10 +431,10 @@ public class K8sRemoteInterpreterProcessTest { } @Test - public void testK8sStartFailed() throws IOException, InterruptedException { + public void testK8sStartFailed() { // given Properties properties = new Properties(); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("SERVICE_DOMAIN", "mydomain"); URL url = Thread.currentThread().getContextClassLoader() .getResource("k8s-specs/interpreter-spec.yaml"); @@ -459,7 +460,7 @@ public class K8sRemoteInterpreterProcessTest { true); PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp); podStatusSimulator.setSecondPhase("Failed"); - podStatusSimulator.setSuccessfullStart(false); + podStatusSimulator.setSuccessfulStart(false); ExecutorService service = Executors.newFixedThreadPool(1); service .submit(podStatusSimulator); @@ -477,10 +478,10 @@ public class K8sRemoteInterpreterProcessTest { } @Test - public void testK8sStartTimeoutPending() throws IOException, InterruptedException { + public void testK8sStartTimeoutPending() throws InterruptedException { // given Properties properties = new Properties(); - HashMap<String, String> envs = new HashMap<String, String>(); + Map<String, String> envs = new HashMap<>(); envs.put("SERVICE_DOMAIN", "mydomain"); URL url = Thread.currentThread().getContextClassLoader() .getResource("k8s-specs/interpreter-spec.yaml"); @@ -507,7 +508,7 @@ public class K8sRemoteInterpreterProcessTest { PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp); podStatusSimulator.setFirstPhase("Pending"); podStatusSimulator.setSecondPhase("Pending"); - podStatusSimulator.setSuccessfullStart(false); + podStatusSimulator.setSuccessfulStart(false); ExecutorService service = Executors.newFixedThreadPool(2); service .submit(podStatusSimulator); @@ -558,7 +559,7 @@ public class K8sRemoteInterpreterProcessTest { public void setSecondPhase(String phase) { this.secondPhase = phase; } - public void setSuccessfullStart(boolean successful) { + public void setSuccessfulStart(boolean successful) { this.successfulStart = successful; } @@ -590,6 +591,7 @@ public class K8sRemoteInterpreterProcessTest { } } } catch (InterruptedException e) { + // Do nothing } } } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java index fb06768..aa08352 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java @@ -38,7 +38,7 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesServer; public class PodPhaseWatcherTest { @Rule - public KubernetesServer server = new KubernetesServer(true, true); + public KubernetesServer server = new KubernetesServer(false, true); @Test public void testPhase() throws InterruptedException { diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml index 116b0df..94717c2 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml @@ -73,21 +73,6 @@ spec: limits: cpu: "{{zeppelin.k8s.interpreter.cores}}" {% endif %} - {% if zeppelin.k8s.interpreter.group.name == "spark" %} - volumeMounts: - - name: spark-home - mountPath: /spark - initContainers: - - name: spark-home-init - image: {{zeppelin.k8s.spark.container.image}} - command: ["sh", "-c", "cp -r /opt/spark/* /spark/"] - volumeMounts: - - name: spark-home - mountPath: /spark - volumes: - - name: spark-home - emptyDir: {} - {% endif %} --- kind: Service apiVersion: v1 diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/log4j.properties b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/log4j.properties new file mode 100644 index 0000000..c846ba5 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c:%L - %m%n +#log4j.appender.stdout.layout.ConversionPattern= +#%5p [%t] (%F:%L) - %m%n +#%-4r [%t] %-5p %c %x - %m%n +# + +# Root logger option +log4j.rootLogger=INFO, stdout +#log4j.logger.org.apache.zeppelin.interpreter=DEBUG +log4j.io.fabric8.kubernetes.client.Config=INFO, stdout diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index c0980b4..c37b2e4 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -591,7 +591,8 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo public void testSchedulePoolUsage() throws InterruptedException, IOException { final int timeout = 30; final String everySecondCron = "* * * * * ?"; - final CountDownLatch jobsToExecuteCount = new CountDownLatch(8); + // each run starts a new JVM and the job takes about ~5 seconds + final CountDownLatch jobsToExecuteCount = new CountDownLatch(5); final Note note = notebook.createNote("note1", anonymous); executeNewParagraphByCron(note, everySecondCron); @@ -655,8 +656,9 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_CRON_FOLDERS.getVarName(), "/System"); try { - final int timeout = 20; + final int timeout = 30; final String everySecondCron = "* * * * * ?"; + // each run starts a new JVM and the job takes about ~5 seconds final CountDownLatch jobsToExecuteCount = new CountDownLatch(5); final Note note = notebook.createNote("note1", anonymous); @@ -964,7 +966,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo assertNull(registry.get("o1", note.getId(), null)); assertNull(registry.get("o2", note.getId(), p1.getId())); - // global object sould be remained + // global object should be remained assertNotNull(registry.get("o3", null, null)); }