This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 478a2b458ba Kill kube watcher instance if it doesnt terminate 
gracefully in 60 seconds (#52662)
478a2b458ba is described below

commit 478a2b458bae57a04aebcda3ac5b231fb49ff764
Author: Pavan Sharma <[email protected]>
AuthorDate: Wed Jul 30 15:19:24 2025 +0530

    Kill kube watcher instance if it doesnt terminate gracefully in 60 seconds 
(#52662)
    
    * Kill kube watcher instance if it doesnt terminate gracefully in 60 seconds
    
    * suggested changes
    
    ---------
    
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../cncf/kubernetes/executors/kubernetes_executor_utils.py   | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index b7e03459d60..122cfd9f8fa 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -530,7 +530,17 @@ class AirflowKubernetesScheduler(LoggingMixin):
         self.log.debug("Terminating kube_watchers...")
         for kube_watcher in self.kube_watchers.values():
             kube_watcher.terminate()
-            kube_watcher.join()
+            self.log.debug("kube_watcher=%s", kube_watcher)
+
+        # for now 20 seconds is max wait time for kube watchers to terminate.
+        max_wait_time = 20
+        start_time = time.time()
+        for kube_watcher in self.kube_watchers.values():
+            kube_watcher.join(timeout=max(int(max_wait_time - (time.time() - 
start_time)), 0))
+            if kube_watcher.is_alive():
+                self.log.warning("kube_watcher didn't terminate in time=%s", 
kube_watcher)
+                kube_watcher.kill()
+                kube_watcher.join()
             self.log.debug("kube_watcher=%s", kube_watcher)
         self.log.debug("Flushing watcher_queue...")
         self._flush_watcher_queue()

Reply via email to