This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 395c8798c39 handle rate limiting of k8s api server in k8s executor 
(#64504)
395c8798c39 is described below

commit 395c8798c39b5c8f7326581ccf2686578f7d71dc
Author: Nataneljpwd <[email protected]>
AuthorDate: Wed Apr 1 19:17:18 2026 +0300

    handle rate limiting of k8s api server in k8s executor (#64504)
    
    * handle rate limiting of k8s api server in k8s executor
    
    * fix mypy
    
    * fixed possible type issues
    
    ---------
    
    Co-authored-by: Natanel Rudyuklakir <[email protected]>
---
 .../kubernetes/executors/kubernetes_executor.py    |  29 ++++-
 .../executors/test_kubernetes_executor.py          | 139 ++++++++++++++++++---
 2 files changed, 146 insertions(+), 22 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 42dccccb8e4..8320c566e0b 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -31,7 +31,7 @@ import multiprocessing
 import time
 from collections import Counter, defaultdict
 from contextlib import suppress
-from datetime import datetime
+from datetime import datetime, timedelta
 from queue import Empty, Queue
 from typing import TYPE_CHECKING, Any
 
@@ -115,6 +115,7 @@ class KubernetesExecutor(BaseExecutor):
             "kubernetes_executor", "task_publish_max_retries", fallback=0
         )
         self.completed: set[KubernetesResults] = set()
+        self.create_pods_after: datetime | None = None
 
     def _list_pods(self, query_kwargs):
         query_kwargs["header_params"] = {
@@ -312,6 +313,12 @@ class KubernetesExecutor(BaseExecutor):
 
         from kubernetes.client.rest import ApiException
 
+        if self.create_pods_after and self.create_pods_after > datetime.now():
+            self.log.warning("Skipping pod creation due to kubernetes rate 
limit")
+            return
+
+        self.create_pods_after = None
+
         with contextlib.suppress(Empty):
             for _ in range(self.kube_config.worker_pods_creation_batch_size):
                 task = self.task_queue.get_nowait()
@@ -338,15 +345,21 @@ class KubernetesExecutor(BaseExecutor):
                         # Use the body directly as the message instead.
                         body = {"message": e.body}
 
+                    headers = e.headers or {}
                     retries = self.task_publish_retries[key]
                     # In case of exceeded quota or conflict errors, requeue 
the task as per the task_publish_max_retries
+                    # In case of a rate limit, wait and do not create new pods 
for "Retry-After" seconds
+                    can_retry_publish = (
+                        self.task_publish_max_retries == -1 or retries < 
self.task_publish_max_retries
+                    )
                     message = body.get("message", "")
                     if (
                         (str(e.status) == "403" and "exceeded quota" in 
message)
                         or (str(e.status) == "409" and "object has been 
modified" in message)
                         or (str(e.status) == "410" and "too old resource 
version" in message)
                         or str(e.status) == "500"
-                    ) and (self.task_publish_max_retries == -1 or retries < 
self.task_publish_max_retries):
+                        or str(e.status) == "429"
+                    ) and can_retry_publish:
                         self.log.warning(
                             "[Try %s of %s] Kube ApiException for Task: (%s). 
Reason: %r. Message: %s",
                             self.task_publish_retries[key] + 1,
@@ -355,8 +368,20 @@ class KubernetesExecutor(BaseExecutor):
                             e.reason,
                             message,
                         )
+
                         self.task_queue.put(task)
                         self.task_publish_retries[key] = retries + 1
+
+                        if str(e.status) == "429":
+                            self.create_pods_after = datetime.now() + 
timedelta(
+                                seconds=int(headers.get("Retry-After", "0"))
+                            )
+                            self.log.warning(
+                                "Got rate limit from k8s api, skipping pod 
creation until %s",
+                                self.create_pods_after,
+                            )
+                            # stop pod creation to stop api requests
+                            break
                     else:
                         self.log.error("Pod creation failed with reason %r. 
Failing task", e.reason)
                         key = task.key
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 0afb2836e96..74fb8206c7a 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -20,7 +20,7 @@ import random
 import re
 import string
 import time
-from datetime import datetime
+from datetime import datetime, timedelta
 from unittest import mock
 
 import pytest
@@ -275,13 +275,14 @@ class TestKubernetesExecutor:
         AirflowKubernetesScheduler is None, reason="kubernetes python package 
is not installed"
     )
     @pytest.mark.parametrize(
-        ("response", "task_publish_max_retries", "should_requeue", 
"task_expected_state"),
+        ("response", "task_publish_max_retries", "should_requeue", 
"task_expected_state", "retry_delay"),
         [
             pytest.param(
                 HTTPResponse(body='{"message": "any message"}', status=400),
                 0,
                 False,
                 State.FAILED,
+                None,
                 id="400 BadRequest",
             ),
             pytest.param(
@@ -289,6 +290,7 @@ class TestKubernetesExecutor:
                 1,
                 False,
                 State.FAILED,
+                None,
                 id="400 BadRequest (task_publish_max_retries=1)",
             ),
             pytest.param(
@@ -296,6 +298,7 @@ class TestKubernetesExecutor:
                 0,
                 False,
                 State.FAILED,
+                None,
                 id="403 Forbidden (permission denied)",
             ),
             pytest.param(
@@ -303,6 +306,7 @@ class TestKubernetesExecutor:
                 1,
                 False,
                 State.FAILED,
+                None,
                 id="403 Forbidden (permission denied) 
(task_publish_max_retries=1)",
             ),
             pytest.param(
@@ -315,6 +319,7 @@ class TestKubernetesExecutor:
                 0,
                 False,
                 State.FAILED,
+                None,
                 id="403 Forbidden (exceeded quota)",
             ),
             pytest.param(
@@ -327,6 +332,7 @@ class TestKubernetesExecutor:
                 1,
                 True,
                 State.SUCCESS,
+                None,
                 id="403 Forbidden (exceeded quota) 
(task_publish_max_retries=1) (retry succeeded)",
             ),
             pytest.param(
@@ -339,6 +345,7 @@ class TestKubernetesExecutor:
                 1,
                 True,
                 State.FAILED,
+                None,
                 id="403 Forbidden (exceeded quota) 
(task_publish_max_retries=1)  (retry failed)",
             ),
             pytest.param(
@@ -346,6 +353,7 @@ class TestKubernetesExecutor:
                 0,
                 False,
                 State.FAILED,
+                None,
                 id="404 Not Found",
             ),
             pytest.param(
@@ -353,6 +361,7 @@ class TestKubernetesExecutor:
                 1,
                 False,
                 State.FAILED,
+                None,
                 id="404 Not Found (task_publish_max_retries=1)",
             ),
             pytest.param(
@@ -360,6 +369,7 @@ class TestKubernetesExecutor:
                 0,
                 False,
                 State.FAILED,
+                None,
                 id="422 Unprocessable Entity",
             ),
             pytest.param(
@@ -367,6 +377,7 @@ class TestKubernetesExecutor:
                 1,
                 False,
                 State.FAILED,
+                None,
                 id="422 Unprocessable Entity (task_publish_max_retries=1)",
             ),
             pytest.param(
@@ -374,6 +385,7 @@ class TestKubernetesExecutor:
                 0,
                 False,
                 State.FAILED,
+                None,
                 id="12345 fake-unhandled-reason",
             ),
             pytest.param(
@@ -381,6 +393,7 @@ class TestKubernetesExecutor:
                 1,
                 False,
                 State.FAILED,
+                None,
                 id="12345 fake-unhandled-reason (task_publish_max_retries=1) 
(retry succeeded)",
             ),
             pytest.param(
@@ -388,6 +401,7 @@ class TestKubernetesExecutor:
                 1,
                 False,
                 State.FAILED,
+                None,
                 id="12345 fake-unhandled-reason (task_publish_max_retries=1) 
(retry failed)",
             ),
             pytest.param(
@@ -398,6 +412,7 @@ class TestKubernetesExecutor:
                 1,
                 True,
                 State.SUCCESS,
+                None,
                 id="409 conflict",
             ),
             pytest.param(
@@ -408,27 +423,27 @@ class TestKubernetesExecutor:
                 1,
                 True,
                 State.SUCCESS,
+                None,
                 id="410 gone",
             ),
             pytest.param(
-                HTTPResponse(body="Too many requests, please try again 
later.", status=429),
-                0,
-                False,
-                State.FAILED,
-                id="429 Too Many Requests (non-JSON body)",
-            ),
-            pytest.param(
-                HTTPResponse(body="Too many requests, please try again 
later.", status=429),
+                HTTPResponse(
+                    body="Too many requests, please try again later.",
+                    status=429,
+                    headers={"Retry-After": "3"},
+                ),
                 1,
-                False,
-                State.FAILED,
-                id="429 Too Many Requests (non-JSON body) 
(task_publish_max_retries=1)",
+                True,
+                State.SUCCESS,
+                3,
+                id="429 Too Many Requests (non-JSON body) (requeued after 
retry delay)",
             ),
             pytest.param(
                 HTTPResponse(body="", status=429),
                 0,
                 False,
                 State.FAILED,
+                None,
                 id="429 Too Many Requests (empty body)",
             ),
             pytest.param(
@@ -439,6 +454,7 @@ class TestKubernetesExecutor:
                 1,
                 True,
                 State.SUCCESS,
+                None,
                 id="500 Internal Server Error (webhook failure)",
             ),
             pytest.param(
@@ -449,6 +465,7 @@ class TestKubernetesExecutor:
                 1,
                 True,
                 State.FAILED,
+                None,
                 id="500 Internal Server Error (webhook failure) (retry 
failed)",
             ),
         ],
@@ -463,6 +480,7 @@ class TestKubernetesExecutor:
         task_publish_max_retries,
         should_requeue,
         task_expected_state,
+        retry_delay: int | None,
         data_file,
     ):
         """
@@ -491,11 +509,11 @@ class TestKubernetesExecutor:
         template_file = 
data_file("pods/generator_base_with_secrets.yaml").as_posix()
         # A mock kube_client that throws errors when making a pod
         mock_kube_client = mock.patch("kubernetes.client.CoreV1Api", 
autospec=True)
-        mock_kube_client.create_namespaced_pod = 
mock.MagicMock(side_effect=ApiException(http_resp=response))
+        mock_kube_client.create_namespaced_pod = 
mock.MagicMock(side_effect=ApiException(http_resp=response))  # type: 
ignore[attr-defined]
         mock_get_kube_client.return_value = mock_kube_client
         mock_api_client = mock.MagicMock()
         mock_api_client.sanitize_for_serialization.return_value = {}
-        mock_kube_client.api_client = mock_api_client
+        mock_kube_client.api_client = mock_api_client  # type: 
ignore[attr-defined]
         config = {
             ("kubernetes_executor", "pod_template_file"): template_file,
         }
@@ -514,19 +532,23 @@ class TestKubernetesExecutor:
                 )
                 kubernetes_executor.sync()
 
-                assert mock_kube_client.create_namespaced_pod.call_count == 1
+                assert mock_kube_client.create_namespaced_pod.call_count == 1  
# type: ignore[attr-defined]
 
                 if should_requeue:
                     assert not kubernetes_executor.task_queue.empty()
 
                     # Disable the ApiException
-                    if task_expected_state == State.SUCCESS:
-                        mock_kube_client.create_namespaced_pod.side_effect = 
None
+                    if task_expected_state == State.SUCCESS or 
task_expected_state == State.QUEUED:
+                        mock_kube_client.create_namespaced_pod.side_effect = 
None  # type: ignore[attr-defined]
 
                     # Execute the task without errors should empty the queue
-                    mock_kube_client.create_namespaced_pod.reset_mock()
+                    mock_kube_client.create_namespaced_pod.reset_mock()  # 
type: ignore[attr-defined]
+
+                    if retry_delay:
+                        time.sleep(retry_delay + 1)
+
                     kubernetes_executor.sync()
-                    assert mock_kube_client.create_namespaced_pod.called
+                    assert mock_kube_client.create_namespaced_pod.called  # 
type: ignore[attr-defined]
                     assert kubernetes_executor.task_queue.empty()
                     if task_expected_state != State.SUCCESS:
                         assert 
kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
@@ -536,6 +558,82 @@ class TestKubernetesExecutor:
             finally:
                 kubernetes_executor.end()
 
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason="kubernetes python package 
is not installed"
+    )
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+    def test_skip_pod_creation_on_create_pods_after(
+        self,
+        mock_get_kube_client,
+        mock_kubernetes_job_watcher,
+        data_file,
+    ):
+        """
+        Skip pod creation when create_pods_after is in the future
+        """
+
+        template_file = 
data_file("pods/generator_base_with_secrets.yaml").as_posix()
+        # A mock kube_client that throws errors when making a pod
+        mock_kube_client = mock.patch("kubernetes.client.CoreV1Api", 
autospec=True)
+        mock_kube_client.create_namespaced_pod = mock.MagicMock(  # type: 
ignore[attr-defined]
+            side_effect=ApiException(
+                http_resp=HTTPResponse(
+                    body="Too many requests, please try again later.",
+                    status=429,
+                    headers={"Retry-After": "999999"},
+                ),
+            )
+        )
+        mock_get_kube_client.return_value = mock_kube_client
+        mock_api_client = mock.MagicMock()
+        mock_api_client.sanitize_for_serialization.return_value = {}
+        mock_kube_client.api_client = mock_api_client  # type: 
ignore[attr-defined]
+        config = {
+            ("kubernetes_executor", "pod_template_file"): template_file,
+        }
+        with conf_vars(config):
+            kubernetes_executor = self.kubernetes_executor
+            kubernetes_executor.task_publish_max_retries = 1
+            kubernetes_executor.start()
+            try:
+                try_number = 1
+                task_instance_key = TaskInstanceKey("dag", "task", "run_id", 
try_number)
+                kubernetes_executor.execute_async(
+                    key=task_instance_key,
+                    queue=None,
+                    command=["airflow", "tasks", "run", "true", 
"some_parameter"],
+                )
+                kubernetes_executor.sync()
+
+                # There should be only one request to create pod which fails
+                assert mock_kube_client.create_namespaced_pod.call_count == 1  
# type: ignore[attr-defined]
+                # The task is queued
+                assert not kubernetes_executor.task_queue.empty()
+
+                # sync multiple times to make sure that no other pod is trying 
to be created
+                kubernetes_executor.sync()
+                kubernetes_executor.sync()
+                kubernetes_executor.sync()
+
+                assert not kubernetes_executor.task_queue.empty()
+                assert mock_kube_client.create_namespaced_pod.call_count == 1  
# type: ignore[attr-defined]
+
+                kubernetes_executor.create_pods_after = datetime.now() - 
timedelta(hours=1)
+
+                mock_kube_client.create_namespaced_pod.side_effect = None
+                mock_kube_client.create_namespaced_pod.reset_mock()
+
+                kubernetes_executor.sync()
+
+                assert mock_kube_client.create_namespaced_pod.call_count == 1
+
+                assert kubernetes_executor.task_queue.empty()
+                assert kubernetes_executor.event_buffer[task_instance_key][0] 
== State.QUEUED
+
+            finally:
+                kubernetes_executor.end()
+
     @pytest.mark.skipif(
         AirflowKubernetesScheduler is None, reason="kubernetes python package 
is not installed"
     )
@@ -595,6 +693,7 @@ class TestKubernetesExecutor:
         mock_api_client = mock.MagicMock()
         mock_api_client.sanitize_for_serialization.return_value = {}
         mock_kube_client.api_client = mock_api_client
+
         config = {("kubernetes_executor", "pod_template_file"): template_file}
         with conf_vars(config):
             kubernetes_executor = self.kubernetes_executor

Reply via email to