This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 478a2b458ba Kill kube watcher instance if it doesnt terminate
gracefully in 60 seconds (#52662)
478a2b458ba is described below
commit 478a2b458bae57a04aebcda3ac5b231fb49ff764
Author: Pavan Sharma <[email protected]>
AuthorDate: Wed Jul 30 15:19:24 2025 +0530
Kill kube watcher instance if it doesnt terminate gracefully in 60 seconds
(#52662)
* Kill kube watcher instance if it doesnt terminate gracefully in 60 seconds
* suggested changes
---------
Co-authored-by: Jarek Potiuk <[email protected]>
---
.../cncf/kubernetes/executors/kubernetes_executor_utils.py | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index b7e03459d60..122cfd9f8fa 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -530,7 +530,17 @@ class AirflowKubernetesScheduler(LoggingMixin):
self.log.debug("Terminating kube_watchers...")
for kube_watcher in self.kube_watchers.values():
kube_watcher.terminate()
- kube_watcher.join()
+ self.log.debug("kube_watcher=%s", kube_watcher)
+
+ # for now 20 seconds is max wait time for kube watchers to terminate.
+ max_wait_time = 20
+ start_time = time.time()
+ for kube_watcher in self.kube_watchers.values():
+ kube_watcher.join(timeout=max(int(max_wait_time - (time.time() -
start_time)), 0))
+ if kube_watcher.is_alive():
+ self.log.warning("kube_watcher didn't terminate in time=%s",
kube_watcher)
+ kube_watcher.kill()
+ kube_watcher.join()
self.log.debug("kube_watcher=%s", kube_watcher)
self.log.debug("Flushing watcher_queue...")
self._flush_watcher_queue()