This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new a57b4b22a7 [ZEPPELIN-5995] Update Kubernetes Library and hopefully fix 
flaky tests (#4712)
a57b4b22a7 is described below

commit a57b4b22a74cac31498982380204bb38ac7cbdfe
Author: Philipp Dallig <philipp.dal...@gmail.com>
AuthorDate: Mon Feb 19 16:57:32 2024 +0100

    [ZEPPELIN-5995] Update Kubernetes Library and hopefully fix flaky tests 
(#4712)
---
 pom.xml                                            |  8 +++
 zeppelin-plugins/launcher/k8s-standard/pom.xml     |  7 ++-
 .../launcher/K8sRemoteInterpreterProcessTest.java  | 73 +++++++++++-----------
 .../K8sStandardInterpreterLauncherTest.java        |  2 +
 .../interpreter/launcher/PodPhaseWatcherTest.java  | 27 ++++++--
 5 files changed, 72 insertions(+), 45 deletions(-)

diff --git a/pom.xml b/pom.xml
index f5ad751a46..49b769035a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,6 +161,7 @@
     <junit.jupiter.version>5.7.1</junit.jupiter.version>
     <mockito.version>3.12.4</mockito.version>
     <assertj.version>1.7.0</assertj.version>
+    <awaitility.version>4.2.0</awaitility.version>
 
     <!-- plugin versions -->
     <plugin.antrun.version>1.8</plugin.antrun.version>
@@ -1160,6 +1161,13 @@
         <scope>test</scope>
       </dependency>
 
+      <dependency>
+        <groupId>org.awaitility</groupId>
+        <artifactId>awaitility</artifactId>
+        <version>${awaitility.version}</version>
+        <scope>test</scope>
+      </dependency>
+
       <dependency>
         <groupId>org.testcontainers</groupId>
         <artifactId>neo4j</artifactId>
diff --git a/zeppelin-plugins/launcher/k8s-standard/pom.xml 
b/zeppelin-plugins/launcher/k8s-standard/pom.xml
index 7e8042f8c6..dbf747d907 100644
--- a/zeppelin-plugins/launcher/k8s-standard/pom.xml
+++ b/zeppelin-plugins/launcher/k8s-standard/pom.xml
@@ -35,7 +35,7 @@
 
     <properties>
         <plugin.name>Launcher/K8sStandardInterpreterLauncher</plugin.name>
-        <kubernetes.client.version>5.4.1</kubernetes.client.version>
+        <kubernetes.client.version>5.12.4</kubernetes.client.version>
         <jinjava.version>2.5.4</jinjava.version>
     </properties>
 
@@ -74,6 +74,11 @@
             <version>${kubernetes.client.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 17a8b89027..1d9abd8b1b 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -21,13 +21,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.awaitility.Awaitility.await;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.time.Instant;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -79,6 +80,7 @@ class K8sRemoteInterpreterProcessTest {
     assertEquals("12321:12321", intp.getInterpreterPortRange());
     assertEquals(22321, intp.getSparkDriverPort());
     assertEquals(22322, intp.getSparkBlockManagerPort());
+    intp.close();
   }
 
   @Test
@@ -130,6 +132,7 @@ class K8sRemoteInterpreterProcessTest {
     envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
     assertTrue(envs.containsKey("SERVICE_DOMAIN"));
     assertTrue(envs.containsKey("ZEPPELIN_HOME"));
+    intp.close();
   }
 
   @Test
@@ -189,6 +192,7 @@ class K8sRemoteInterpreterProcessTest {
     assertTrue(zeppelinSparkConf.contains("spark.jars.ivy=my_ivy_path"));
     assertFalse(zeppelinSparkConf.contains("--proxy-user"));
     assertTrue(intp.isSpark());
+    intp.close();
   }
 
   @Test
@@ -242,6 +246,7 @@ class K8sRemoteInterpreterProcessTest {
     assertTrue(zeppelinSparkConf.contains("spark.blockManager.port=" + 
intp.getSparkBlockManagerPort()));
     assertTrue(zeppelinSparkConf.contains("--proxy-user|mytestUser"));
     assertTrue(intp.isSpark());
+    intp.close();
   }
 
   @Test
@@ -286,6 +291,7 @@ class K8sRemoteInterpreterProcessTest {
     String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
     assertFalse(sparkSubmitOptions.contains("--proxy-user"));
     assertTrue(intp.isSpark());
+    intp.close();
   }
 
   @Test
@@ -329,6 +335,7 @@ class K8sRemoteInterpreterProcessTest {
             4040,
             "zeppelin-server",
             "my.domain.com"));
+    intp.close();
   }
 
   @Test
@@ -365,6 +372,7 @@ class K8sRemoteInterpreterProcessTest {
     // then
     assertEquals("1", p.get("zeppelin.k8s.interpreter.cores"));
     assertEquals("1408Mi", p.get("zeppelin.k8s.interpreter.memory"));
+    intp.close();
   }
 
   @Test
@@ -475,16 +483,14 @@ class K8sRemoteInterpreterProcessTest {
     service
         .submit(podStatusSimulator);
     // should throw an IOException
-    try {
+    IOException e = assertThrows(IOException.class, () -> {
       intp.start("TestUser");
-      fail("We excepting an IOException");
-    } catch (IOException e) {
-      assertNotNull(e);
-      // Check that the Pod is deleted
-      assertNull(
+    });
+    assertNotNull(e);
+    // Check that the Pod is deleted
+    assertNull(
         
client.pods().inNamespace(intp.getInterpreterNamespace()).withName(intp.getPodName())
               .get());
-    }
   }
 
   @Test
@@ -525,9 +531,7 @@ class K8sRemoteInterpreterProcessTest {
     service.submit(() -> {
       try {
         intp.start("TestUser");
-        fail("We interrupt, this line of code should not be executed.");
       } catch (IOException e) {
-        fail("We interrupt, this line of code should not be executed.");
       }
     });
     // wait a little bit
@@ -575,34 +579,27 @@ class K8sRemoteInterpreterProcessTest {
 
     @Override
     public void run() {
-      try {
-        Instant timeoutTime = Instant.now().plusSeconds(10);
-        while (timeoutTime.isAfter(Instant.now())) {
-          Pod pod = 
client.pods().inNamespace(namespace).withName(podName).get();
-          if (pod != null) {
-            TimeUnit.SECONDS.sleep(1);
-            // Update Pod to "pending" phase
-            pod.setStatus(new PodStatus(null, null, null, null, null, null, 
null, firstPhase,
-                null,
-                null, null, null, null));
-            client.pods().inNamespace(namespace).updateStatus(pod);
-            // Update Pod to "Running" phase
-            pod.setStatus(new PodStatus(null, null, null, null, null, null, 
null, secondPhase,
-                null,
-                null, null, null, null));
-            client.pods().inNamespace(namespace).updateStatus(pod);
-            TimeUnit.SECONDS.sleep(1);
-            if (successfulStart) {
-              process.processStarted(12320, "testing");
-            }
-            break;
-          } else {
-            TimeUnit.MILLISECONDS.sleep(100);
-          }
-        }
-      } catch (InterruptedException e) {
-        // Do nothing
+      await().until(() -> 
client.pods().inNamespace(namespace).withName(podName).get() != null);
+      // Pod is present set first phase
+      Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
+      pod.setStatus(new PodStatus(null, null, null, null, null, null, null, 
firstPhase,
+          null,
+          null, null, null, null));
+      client.pods().inNamespace(namespace).replaceStatus(pod);
+      await().pollDelay(Duration.ofMillis(200)).until(() -> firstPhase.equals(
+          
client.pods().inNamespace(namespace).withName(podName).get().getStatus().getPhase()));
+      // Set second Phase
+      pod = client.pods().inNamespace(namespace).withName(podName).get();
+      pod.setStatus(new PodStatus(null, null, null, null, null, null, null, 
secondPhase,
+          null,
+          null, null, null, null));
+      client.pods().inNamespace(namespace).replaceStatus(pod);
+      await().pollDelay(Duration.ofMillis(200)).until(() -> secondPhase.equals(
+          
client.pods().inNamespace(namespace).withName(podName).get().getStatus().getPhase()));
+      if (successfulStart) {
+        process.processStarted(12320, "testing");
       }
     }
   }
+
 }
diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
index cf9849c035..160c191ff3 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
@@ -99,6 +99,7 @@ class K8sStandardInterpreterLauncherTest {
     K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
     assertTrue(process.isSpark());
     
assertTrue(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user|user1"));
+    process.close();
   }
 
   @Test
@@ -132,5 +133,6 @@ class K8sStandardInterpreterLauncherTest {
     K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
     assertTrue(process.isSpark());
     
assertFalse(process.prepareZeppelinSparkConf(context.getUserName()).contains("--proxy-user
 user1"));
+    process.close();
   }
 }
diff --git 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
index e8126f006c..f50a54a076 100644
--- 
a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
+++ 
b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
@@ -22,7 +22,10 @@ package org.apache.zeppelin.interpreter.launcher;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.awaitility.Awaitility.await;
 
+import java.time.Duration;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
@@ -46,7 +49,9 @@ class PodPhaseWatcherTest {
   void testPhase() throws InterruptedException {
     // CREATE
     client.pods().inNamespace("ns1")
-        .create(new 
PodBuilder().withNewMetadata().withName("pod1").endMetadata().build());
+        .create(new 
PodBuilder().withNewMetadata().withName("pod1").endMetadata().withNewStatus()
+            .endStatus().build());
+    await().until(isPodAvailable("pod1"));
     // READ
     PodList podList = client.pods().inNamespace("ns1").list();
     assertNotNull(podList);
@@ -56,23 +61,33 @@ class PodPhaseWatcherTest {
     PodPhaseWatcher podWatcher = new PodPhaseWatcher(
         phase -> StringUtils.equalsAnyIgnoreCase(phase, "Succeeded", "Failed", 
"Running"));
     try (Watch watch = 
client.pods().inNamespace("ns1").withName("pod1").watch(podWatcher)) {
-
       // Update Pod to "pending" phase
       pod.setStatus(new PodStatus(null, null, null, null, null, null, null, 
"Pending", null, null,
               null, null, null));
-      pod = client.pods().inNamespace("ns1").updateStatus(pod);
+      pod = client.pods().inNamespace("ns1").replaceStatus(pod);
 
       // Wait a little bit, till update is applied
-      Thread.sleep(1000);
+      await().pollDelay(Duration.ofSeconds(1))
+          .until(isPodPhase(pod.getMetadata().getName(), "Pending"));
       // Update Pod to "Running" phase
       pod.setStatus(new PodStatusBuilder(new PodStatus(null, null, null, null, 
null, null, null,
               "Running", null, null, null, null, null)).build());
-      client.pods().inNamespace("ns1").updateStatus(pod);
-
+      client.pods().inNamespace("ns1").replaceStatus(pod);
+      await().pollDelay(Duration.ofSeconds(1))
+          .until(isPodPhase(pod.getMetadata().getName(), "Running"));
       assertTrue(podWatcher.getCountDownLatch().await(1, TimeUnit.SECONDS));
     }
   }
 
+  private Callable<Boolean> isPodPhase(String pod, String phase) {
+    return () -> phase
+        
.equals(client.pods().inNamespace("ns1").withName(pod).get().getStatus().getPhase());
+  }
+
+  private Callable<Boolean> isPodAvailable(String pod) {
+    return () -> client.pods().inNamespace("ns1").withName(pod).get() != null;
+  }
+
   @Test
   void testPhaseWithError() throws InterruptedException {
     // CREATE

Reply via email to