This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 95a0f43 [ZEPPELIN-5270] Implementing a soft shutdown for K8s interpreters 95a0f43 is described below commit 95a0f43c57deae12f9cef436d5e1560fd533ee58 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Thu Jan 28 10:47:07 2021 +0100 [ZEPPELIN-5270] Implementing a soft shutdown for K8s interpreters ### What is this PR for? This PR adds a softshutdown for K8s interpreters. Also changed the implementation from multiple API calls to a watcher implementation. ### What type of PR is it? - Improvement ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5270 ### How should this be tested? * CI ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #4064 from Reamer/k8s_potwatcher and squashes the following commits: 95ebd59bd [Philipp Dallig] Implement softshutdown in K8s with K8s watcher --- .../launcher/K8sRemoteInterpreterProcess.java | 35 +++++--- .../interpreter/launcher/PodPhaseWatcher.java | 64 +++++++++++++++ .../interpreter/launcher/PodPhaseWatcherTest.java | 92 ++++++++++++++++++++++ 3 files changed, 181 insertions(+), 10 deletions(-) diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java index e0a87fa..e44a5af 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; @@ -32,6 +33,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +48,7 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.LocalPortForward; +import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.dsl.ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable; public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess { @@ -143,16 +146,16 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess // special handling if we doesn't want timeout the process during lifecycle phase pending if (!timeoutDuringPending) { - while (!StringUtils.equalsAnyIgnoreCase(getPodPhase(), "Succeeded", "Failed", "Running") - && !Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - LOGGER.error("Interrupt received during pending phase. Try to stop the interpreter and interrupt the current thread.", e); - processStopped("Start process was interrupted during the pending phase"); - stop(); - Thread.currentThread().interrupt(); - } + // WATCH + PodPhaseWatcher podWatcher = new PodPhaseWatcher( + phase -> StringUtils.equalsAnyIgnoreCase(phase, "Succeeded", "Failed", "Running")); + try (Watch watch = client.pods().inNamespace(namespace).withName(podName).watch(podWatcher)) { + podWatcher.getCountDownLatch().await(); + } catch (InterruptedException e) { + LOGGER.error("Interrupt received during waiting for Running phase. Try to stop the interpreter and interrupt the current thread.", e); + processStopped("Start process was interrupted during waiting for Running phase"); + stop(); + Thread.currentThread().interrupt(); } } @@ -183,6 +186,18 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess @Override public void stop() { super.stop(); + // WATCH for soft shutdown + PodPhaseWatcher podWatcher = new PodPhaseWatcher(phase -> StringUtils.equalsAny(phase, "Succeeded", "Failed")); + try (Watch watch = client.pods().inNamespace(namespace).withName(podName).watch(podWatcher)) { + if (!podWatcher.getCountDownLatch().await(RemoteInterpreterServer.DEFAULT_SHUTDOWN_TIMEOUT + 500, + TimeUnit.MILLISECONDS)) { + LOGGER.warn("Pod {} doesn't terminate in time", podName); + } + } catch (InterruptedException e) { + LOGGER.error("Interruption received while waiting for stop.", e); + processStopped("Stop process was interrupted during termination"); + Thread.currentThread().interrupt(); + } Properties templateProperties = getTemplateBindings(null); // delete pod try { diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java new file mode 100644 index 0000000..ddf23f5 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcher.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodStatus; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; + +public class PodPhaseWatcher implements Watcher<Pod> { + private static final Logger LOGGER = LoggerFactory.getLogger(PodPhaseWatcher.class); + private final CountDownLatch countDownLatch; + private final Predicate<String> predicate; + + public PodPhaseWatcher(Predicate<String> predicate) { + this.countDownLatch = new CountDownLatch(1); + this.predicate = predicate; + } + + @Override + public void eventReceived(Action action, Pod pod) { + PodStatus status = pod.getStatus(); + if (status != null && predicate.test(status.getPhase())) { + LOGGER.info("Pod {} meets phase {}", pod.getMetadata().getName(), status.getPhase()); + countDownLatch.countDown(); + } + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOGGER.error("PodWatcher exits abnormally", cause); + } + // always count down, so threads that are waiting will continue + countDownLatch.countDown(); + } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + +} diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java new file mode 100644 index 0000000..fb06768 --- /dev/null +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.junit.Rule; +import org.junit.Test; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodStatus; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watch; +import io.fabric8.kubernetes.client.server.mock.KubernetesServer; + +public class PodPhaseWatcherTest { + + @Rule + public KubernetesServer server = new KubernetesServer(true, true); + + @Test + public void testPhase() throws InterruptedException { + KubernetesClient client = server.getClient(); + // CREATE + client.pods().inNamespace("ns1") + .create(new PodBuilder().withNewMetadata().withName("pod1").endMetadata().build()); + // READ + PodList podList = client.pods().inNamespace("ns1").list(); + assertNotNull(podList); + assertEquals(1, podList.getItems().size()); + Pod pod = podList.getItems().get(0); + // WATCH + PodPhaseWatcher podWatcher = new PodPhaseWatcher( + phase -> StringUtils.equalsAnyIgnoreCase(phase, "Succeeded", "Failed", "Running")); + Watch watch = client.pods().inNamespace("ns1").withName("pod1").watch(podWatcher); + + // Update Pod to "pending" phase + pod.setStatus(new PodStatus(null, null, null, null, null, null, null, "Pending", null, null, + null, null, null)); + client.pods().inNamespace("ns1").updateStatus(pod); + + // Update Pod to "Running" phase + pod.setStatus(new PodStatusBuilder(new PodStatus(null, null, null, null, null, null, null, + "Running", null, null, null, null, null)).build()); + client.pods().inNamespace("ns1").updateStatus(pod); + + assertTrue(podWatcher.getCountDownLatch().await(1, TimeUnit.SECONDS)); + watch.close(); + } + + @Test + public void testPhaseWithError() throws InterruptedException { + KubernetesClient client = server.getClient(); + // CREATE + client.pods().inNamespace("ns1") + .create(new PodBuilder().withNewMetadata().withName("pod1").endMetadata().build()); + // READ + PodList podList = client.pods().inNamespace("ns1").list(); + assertNotNull(podList); + assertEquals(1, podList.getItems().size()); + // WATCH + PodPhaseWatcher podWatcher = new PodPhaseWatcher( + phase -> StringUtils.equalsAnyIgnoreCase(phase, "Succeeded", "Failed", "Running")); + Watch watch = client.pods().inNamespace("ns1").withName("pod1").watch(podWatcher); + + // In the case of close, we do not block thread execution + watch.close(); + assertTrue(podWatcher.getCountDownLatch().await(1, TimeUnit.SECONDS)); + } +}