This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new a57b4b22a7 [ZEPPELIN-5995] Update Kubernetes Library and hopefully fix flaky tests (#4712) a57b4b22a7 is described below commit a57b4b22a74cac31498982380204bb38ac7cbdfe Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Mon Feb 19 16:57:32 2024 +0100 [ZEPPELIN-5995] Update Kubernetes Library and hopefully fix flaky tests (#4712) --- pom.xml | 8 +++ zeppelin-plugins/launcher/k8s-standard/pom.xml | 7 ++- .../launcher/K8sRemoteInterpreterProcessTest.java | 73 +++++++++++----------- .../K8sStandardInterpreterLauncherTest.java | 2 + .../interpreter/launcher/PodPhaseWatcherTest.java | 27 ++++++-- 5 files changed, 72 insertions(+), 45 deletions(-) diff --git a/pom.xml b/pom.xml index f5ad751a46..49b769035a 100644 --- a/pom.xml +++ b/pom.xml @@ -161,6 +161,7 @@ <junit.jupiter.version>5.7.1</junit.jupiter.version> <mockito.version>3.12.4</mockito.version> <assertj.version>1.7.0</assertj.version> + <awaitility.version>4.2.0</awaitility.version> <!-- plugin versions --> <plugin.antrun.version>1.8</plugin.antrun.version> @@ -1160,6 +1161,13 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.testcontainers</groupId> <artifactId>neo4j</artifactId> diff --git a/zeppelin-plugins/launcher/k8s-standard/pom.xml b/zeppelin-plugins/launcher/k8s-standard/pom.xml index 7e8042f8c6..dbf747d907 100644 --- a/zeppelin-plugins/launcher/k8s-standard/pom.xml +++ b/zeppelin-plugins/launcher/k8s-standard/pom.xml @@ -35,7 +35,7 @@ <properties> <plugin.name>Launcher/K8sStandardInterpreterLauncher</plugin.name> - <kubernetes.client.version>5.4.1</kubernetes.client.version> + <kubernetes.client.version>5.12.4</kubernetes.client.version> <jinjava.version>2.5.4</jinjava.version> </properties> @@ -74,6 +74,11 @@ <version>${kubernetes.client.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java index 17a8b89027..1d9abd8b1b 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java @@ -21,13 +21,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +import static org.awaitility.Awaitility.await; import java.io.File; import java.io.IOException; import java.net.URL; -import java.time.Instant; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -79,6 +80,7 @@ class K8sRemoteInterpreterProcessTest { assertEquals("12321:12321", intp.getInterpreterPortRange()); assertEquals(22321, intp.getSparkDriverPort()); assertEquals(22322, intp.getSparkBlockManagerPort()); + intp.close(); } @Test @@ -130,6 +132,7 @@ class K8sRemoteInterpreterProcessTest { envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs"); assertTrue(envs.containsKey("SERVICE_DOMAIN")); assertTrue(envs.containsKey("ZEPPELIN_HOME")); + intp.close(); } @Test @@ -189,6 +192,7 @@ class K8sRemoteInterpreterProcessTest { assertTrue(zeppelinSparkConf.contains("spark.jars.ivy=my_ivy_path")); assertFalse(zeppelinSparkConf.contains("--proxy-user")); assertTrue(intp.isSpark()); + intp.close(); } @Test @@ -242,6 +246,7 @@ class K8sRemoteInterpreterProcessTest { assertTrue(zeppelinSparkConf.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort())); assertTrue(zeppelinSparkConf.contains("--proxy-user|mytestUser")); assertTrue(intp.isSpark()); + intp.close(); } @Test @@ -286,6 +291,7 @@ class K8sRemoteInterpreterProcessTest { String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS"); assertFalse(sparkSubmitOptions.contains("--proxy-user")); assertTrue(intp.isSpark()); + intp.close(); } @Test @@ -329,6 +335,7 @@ class K8sRemoteInterpreterProcessTest { 4040, "zeppelin-server", "my.domain.com")); + intp.close(); } @Test @@ -365,6 +372,7 @@ class K8sRemoteInterpreterProcessTest { // then assertEquals("1", p.get("zeppelin.k8s.interpreter.cores")); assertEquals("1408Mi", p.get("zeppelin.k8s.interpreter.memory")); + intp.close(); } @Test @@ -475,16 +483,14 @@ class K8sRemoteInterpreterProcessTest { service .submit(podStatusSimulator); // should throw an IOException - try { + IOException e = assertThrows(IOException.class, () -> { intp.start("TestUser"); - fail("We excepting an IOException"); - } catch (IOException e) { - assertNotNull(e); - // Check that the Pod is deleted - assertNull( + }); + assertNotNull(e); + // Check that the Pod is deleted + assertNull( client.pods().inNamespace(intp.getInterpreterNamespace()).withName(intp.getPodName()) .get()); - } } @Test @@ -525,9 +531,7 @@ class K8sRemoteInterpreterProcessTest { service.submit(() -> { try { intp.start("TestUser"); - fail("We interrupt, this line of code should not be executed."); } catch (IOException e) { - fail("We interrupt, this line of code should not be executed."); } }); // wait a little bit @@ -575,34 +579,27 @@ class K8sRemoteInterpreterProcessTest { @Override public void run() { - try { - Instant timeoutTime = Instant.now().plusSeconds(10); - while (timeoutTime.isAfter(Instant.now())) { - Pod pod = client.pods().inNamespace(namespace).withName(podName).get(); - if (pod != null) { - TimeUnit.SECONDS.sleep(1); - // Update Pod to "pending" phase - pod.setStatus(new PodStatus(null, null, null, null, null, null, null, firstPhase, - null, - null, null, null, null)); - client.pods().inNamespace(namespace).updateStatus(pod); - // Update Pod to "Running" phase - pod.setStatus(new PodStatus(null, null, null, null, null, null, null, secondPhase, - null, - null, null, null, null)); - client.pods().inNamespace(namespace).updateStatus(pod); - TimeUnit.SECONDS.sleep(1); - if (successfulStart) { - process.processStarted(12320, "testing"); - } - break; - } else { - TimeUnit.MILLISECONDS.sleep(100); - } - } - } catch (InterruptedException e) { - // Do nothing + await().until(() -> client.pods().inNamespace(namespace).withName(podName).get() != null); + // Pod is present set first phase + Pod pod = client.pods().inNamespace(namespace).withName(podName).get(); + pod.setStatus(new PodStatus(null, null, null, null, null, null, null, firstPhase, + null, + null, null, null, null)); + client.pods().inNamespace(namespace).replaceStatus(pod); + await().pollDelay(Duration.ofMillis(200)).until(() -> firstPhase.equals( + client.pods().inNamespace(namespace).withName(podName).get().getStatus().getPhase())); + // Set second Phase + pod = client.pods().inNamespace(namespace).withName(podName).get(); + pod.setStatus(new PodStatus(null, null, null, null, null, null, null, secondPhase, + null, + null, null, null, null)); + client.pods().inNamespace(namespace).replaceStatus(pod); + await().pollDelay(Duration.ofMillis(200)).until(() -> secondPhase.equals( + client.pods().inNamespace(namespace).withName(podName).get().getStatus().getPhase())); + if (successfulStart) { + process.processStarted(12320, "testing"); } } } + } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java index cf9849c035..160c191ff3 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java @@ -99,6 +99,7 @@ class K8sStandardInterpreterLauncherTest { K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client; assertTrue(process.isSpark()); assertTrue(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user|user1")); + process.close(); } @Test @@ -132,5 +133,6 @@ class K8sStandardInterpreterLauncherTest { K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client; assertTrue(process.isSpark()); assertFalse(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user user1")); + process.close(); } } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java index e8126f006c..f50a54a076 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java @@ -22,7 +22,10 @@ package org.apache.zeppelin.interpreter.launcher; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.awaitility.Awaitility.await; +import java.time.Duration; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; @@ -46,7 +49,9 @@ class PodPhaseWatcherTest { void testPhase() throws InterruptedException { // CREATE client.pods().inNamespace("ns1") - .create(new PodBuilder().withNewMetadata().withName("pod1").endMetadata().build()); + .create(new PodBuilder().withNewMetadata().withName("pod1").endMetadata().withNewStatus() + .endStatus().build()); + await().until(isPodAvailable("pod1")); // READ PodList podList = client.pods().inNamespace("ns1").list(); assertNotNull(podList); @@ -56,23 +61,33 @@ class PodPhaseWatcherTest { PodPhaseWatcher podWatcher = new PodPhaseWatcher( phase -> StringUtils.equalsAnyIgnoreCase(phase, "Succeeded", "Failed", "Running")); try (Watch watch = client.pods().inNamespace("ns1").withName("pod1").watch(podWatcher)) { - // Update Pod to "pending" phase pod.setStatus(new PodStatus(null, null, null, null, null, null, null, "Pending", null, null, null, null, null)); - pod = client.pods().inNamespace("ns1").updateStatus(pod); + pod = client.pods().inNamespace("ns1").replaceStatus(pod); // Wait a little bit, till update is applied - Thread.sleep(1000); + await().pollDelay(Duration.ofSeconds(1)) + .until(isPodPhase(pod.getMetadata().getName(), "Pending")); // Update Pod to "Running" phase pod.setStatus(new PodStatusBuilder(new PodStatus(null, null, null, null, null, null, null, "Running", null, null, null, null, null)).build()); - client.pods().inNamespace("ns1").updateStatus(pod); - + client.pods().inNamespace("ns1").replaceStatus(pod); + await().pollDelay(Duration.ofSeconds(1)) + .until(isPodPhase(pod.getMetadata().getName(), "Running")); assertTrue(podWatcher.getCountDownLatch().await(1, TimeUnit.SECONDS)); } } + private Callable<Boolean> isPodPhase(String pod, String phase) { + return () -> phase + .equals(client.pods().inNamespace("ns1").withName(pod).get().getStatus().getPhase()); + } + + private Callable<Boolean> isPodAvailable(String pod) { + return () -> client.pods().inNamespace("ns1").withName(pod).get() != null; + } + @Test void testPhaseWithError() throws InterruptedException { // CREATE