This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 395c8798c39 handle rate limiting of k8s api server in k8s executor
(#64504)
395c8798c39 is described below
commit 395c8798c39b5c8f7326581ccf2686578f7d71dc
Author: Nataneljpwd <[email protected]>
AuthorDate: Wed Apr 1 19:17:18 2026 +0300
handle rate limiting of k8s api server in k8s executor (#64504)
* handle rate limiting of k8s api server in k8s executor
* fix mypy
* fixed possible type issues
---------
Co-authored-by: Natanel Rudyuklakir <[email protected]>
---
.../kubernetes/executors/kubernetes_executor.py | 29 ++++-
.../executors/test_kubernetes_executor.py | 139 ++++++++++++++++++---
2 files changed, 146 insertions(+), 22 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 42dccccb8e4..8320c566e0b 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -31,7 +31,7 @@ import multiprocessing
import time
from collections import Counter, defaultdict
from contextlib import suppress
-from datetime import datetime
+from datetime import datetime, timedelta
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any
@@ -115,6 +115,7 @@ class KubernetesExecutor(BaseExecutor):
"kubernetes_executor", "task_publish_max_retries", fallback=0
)
self.completed: set[KubernetesResults] = set()
+ self.create_pods_after: datetime | None = None
def _list_pods(self, query_kwargs):
query_kwargs["header_params"] = {
@@ -312,6 +313,12 @@ class KubernetesExecutor(BaseExecutor):
from kubernetes.client.rest import ApiException
+ if self.create_pods_after and self.create_pods_after > datetime.now():
+ self.log.warning("Skipping pod creation due to kubernetes rate
limit")
+ return
+
+ self.create_pods_after = None
+
with contextlib.suppress(Empty):
for _ in range(self.kube_config.worker_pods_creation_batch_size):
task = self.task_queue.get_nowait()
@@ -338,15 +345,21 @@ class KubernetesExecutor(BaseExecutor):
# Use the body directly as the message instead.
body = {"message": e.body}
+ headers = e.headers or {}
retries = self.task_publish_retries[key]
# In case of exceeded quota or conflict errors, requeue
the task as per the task_publish_max_retries
+ # In case of a rate limit, wait and do not create new pods
for "Retry-After" seconds
+ can_retry_publish = (
+ self.task_publish_max_retries == -1 or retries <
self.task_publish_max_retries
+ )
message = body.get("message", "")
if (
(str(e.status) == "403" and "exceeded quota" in
message)
or (str(e.status) == "409" and "object has been
modified" in message)
or (str(e.status) == "410" and "too old resource
version" in message)
or str(e.status) == "500"
- ) and (self.task_publish_max_retries == -1 or retries <
self.task_publish_max_retries):
+ or str(e.status) == "429"
+ ) and can_retry_publish:
self.log.warning(
"[Try %s of %s] Kube ApiException for Task: (%s).
Reason: %r. Message: %s",
self.task_publish_retries[key] + 1,
@@ -355,8 +368,20 @@ class KubernetesExecutor(BaseExecutor):
e.reason,
message,
)
+
self.task_queue.put(task)
self.task_publish_retries[key] = retries + 1
+
+ if str(e.status) == "429":
+ self.create_pods_after = datetime.now() +
timedelta(
+ seconds=int(headers.get("Retry-After", "0"))
+ )
+ self.log.warning(
+ "Got rate limit from k8s api, skipping pod
creation until %s",
+ self.create_pods_after,
+ )
+ # stop pod creation to stop api requests
+ break
else:
self.log.error("Pod creation failed with reason %r.
Failing task", e.reason)
key = task.key
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 0afb2836e96..74fb8206c7a 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -20,7 +20,7 @@ import random
import re
import string
import time
-from datetime import datetime
+from datetime import datetime, timedelta
from unittest import mock
import pytest
@@ -275,13 +275,14 @@ class TestKubernetesExecutor:
AirflowKubernetesScheduler is None, reason="kubernetes python package
is not installed"
)
@pytest.mark.parametrize(
- ("response", "task_publish_max_retries", "should_requeue",
"task_expected_state"),
+ ("response", "task_publish_max_retries", "should_requeue",
"task_expected_state", "retry_delay"),
[
pytest.param(
HTTPResponse(body='{"message": "any message"}', status=400),
0,
False,
State.FAILED,
+ None,
id="400 BadRequest",
),
pytest.param(
@@ -289,6 +290,7 @@ class TestKubernetesExecutor:
1,
False,
State.FAILED,
+ None,
id="400 BadRequest (task_publish_max_retries=1)",
),
pytest.param(
@@ -296,6 +298,7 @@ class TestKubernetesExecutor:
0,
False,
State.FAILED,
+ None,
id="403 Forbidden (permission denied)",
),
pytest.param(
@@ -303,6 +306,7 @@ class TestKubernetesExecutor:
1,
False,
State.FAILED,
+ None,
id="403 Forbidden (permission denied)
(task_publish_max_retries=1)",
),
pytest.param(
@@ -315,6 +319,7 @@ class TestKubernetesExecutor:
0,
False,
State.FAILED,
+ None,
id="403 Forbidden (exceeded quota)",
),
pytest.param(
@@ -327,6 +332,7 @@ class TestKubernetesExecutor:
1,
True,
State.SUCCESS,
+ None,
id="403 Forbidden (exceeded quota)
(task_publish_max_retries=1) (retry succeeded)",
),
pytest.param(
@@ -339,6 +345,7 @@ class TestKubernetesExecutor:
1,
True,
State.FAILED,
+ None,
id="403 Forbidden (exceeded quota)
(task_publish_max_retries=1) (retry failed)",
),
pytest.param(
@@ -346,6 +353,7 @@ class TestKubernetesExecutor:
0,
False,
State.FAILED,
+ None,
id="404 Not Found",
),
pytest.param(
@@ -353,6 +361,7 @@ class TestKubernetesExecutor:
1,
False,
State.FAILED,
+ None,
id="404 Not Found (task_publish_max_retries=1)",
),
pytest.param(
@@ -360,6 +369,7 @@ class TestKubernetesExecutor:
0,
False,
State.FAILED,
+ None,
id="422 Unprocessable Entity",
),
pytest.param(
@@ -367,6 +377,7 @@ class TestKubernetesExecutor:
1,
False,
State.FAILED,
+ None,
id="422 Unprocessable Entity (task_publish_max_retries=1)",
),
pytest.param(
@@ -374,6 +385,7 @@ class TestKubernetesExecutor:
0,
False,
State.FAILED,
+ None,
id="12345 fake-unhandled-reason",
),
pytest.param(
@@ -381,6 +393,7 @@ class TestKubernetesExecutor:
1,
False,
State.FAILED,
+ None,
id="12345 fake-unhandled-reason (task_publish_max_retries=1)
(retry succeeded)",
),
pytest.param(
@@ -388,6 +401,7 @@ class TestKubernetesExecutor:
1,
False,
State.FAILED,
+ None,
id="12345 fake-unhandled-reason (task_publish_max_retries=1)
(retry failed)",
),
pytest.param(
@@ -398,6 +412,7 @@ class TestKubernetesExecutor:
1,
True,
State.SUCCESS,
+ None,
id="409 conflict",
),
pytest.param(
@@ -408,27 +423,27 @@ class TestKubernetesExecutor:
1,
True,
State.SUCCESS,
+ None,
id="410 gone",
),
pytest.param(
- HTTPResponse(body="Too many requests, please try again
later.", status=429),
- 0,
- False,
- State.FAILED,
- id="429 Too Many Requests (non-JSON body)",
- ),
- pytest.param(
- HTTPResponse(body="Too many requests, please try again
later.", status=429),
+ HTTPResponse(
+ body="Too many requests, please try again later.",
+ status=429,
+ headers={"Retry-After": "3"},
+ ),
1,
- False,
- State.FAILED,
- id="429 Too Many Requests (non-JSON body)
(task_publish_max_retries=1)",
+ True,
+ State.SUCCESS,
+ 3,
+ id="429 Too Many Requests (non-JSON body) (requeued after
retry delay)",
),
pytest.param(
HTTPResponse(body="", status=429),
0,
False,
State.FAILED,
+ None,
id="429 Too Many Requests (empty body)",
),
pytest.param(
@@ -439,6 +454,7 @@ class TestKubernetesExecutor:
1,
True,
State.SUCCESS,
+ None,
id="500 Internal Server Error (webhook failure)",
),
pytest.param(
@@ -449,6 +465,7 @@ class TestKubernetesExecutor:
1,
True,
State.FAILED,
+ None,
id="500 Internal Server Error (webhook failure) (retry
failed)",
),
],
@@ -463,6 +480,7 @@ class TestKubernetesExecutor:
task_publish_max_retries,
should_requeue,
task_expected_state,
+ retry_delay: int | None,
data_file,
):
"""
@@ -491,11 +509,11 @@ class TestKubernetesExecutor:
template_file =
data_file("pods/generator_base_with_secrets.yaml").as_posix()
# A mock kube_client that throws errors when making a pod
mock_kube_client = mock.patch("kubernetes.client.CoreV1Api",
autospec=True)
- mock_kube_client.create_namespaced_pod =
mock.MagicMock(side_effect=ApiException(http_resp=response))
+ mock_kube_client.create_namespaced_pod =
mock.MagicMock(side_effect=ApiException(http_resp=response)) # type:
ignore[attr-defined]
mock_get_kube_client.return_value = mock_kube_client
mock_api_client = mock.MagicMock()
mock_api_client.sanitize_for_serialization.return_value = {}
- mock_kube_client.api_client = mock_api_client
+ mock_kube_client.api_client = mock_api_client # type:
ignore[attr-defined]
config = {
("kubernetes_executor", "pod_template_file"): template_file,
}
@@ -514,19 +532,23 @@ class TestKubernetesExecutor:
)
kubernetes_executor.sync()
- assert mock_kube_client.create_namespaced_pod.call_count == 1
+ assert mock_kube_client.create_namespaced_pod.call_count == 1
# type: ignore[attr-defined]
if should_requeue:
assert not kubernetes_executor.task_queue.empty()
# Disable the ApiException
- if task_expected_state == State.SUCCESS:
- mock_kube_client.create_namespaced_pod.side_effect =
None
+ if task_expected_state == State.SUCCESS or
task_expected_state == State.QUEUED:
+ mock_kube_client.create_namespaced_pod.side_effect =
None # type: ignore[attr-defined]
# Execute the task without errors should empty the queue
- mock_kube_client.create_namespaced_pod.reset_mock()
+ mock_kube_client.create_namespaced_pod.reset_mock() #
type: ignore[attr-defined]
+
+ if retry_delay:
+ time.sleep(retry_delay + 1)
+
kubernetes_executor.sync()
- assert mock_kube_client.create_namespaced_pod.called
+ assert mock_kube_client.create_namespaced_pod.called #
type: ignore[attr-defined]
assert kubernetes_executor.task_queue.empty()
if task_expected_state != State.SUCCESS:
assert
kubernetes_executor.event_buffer[task_instance_key][0] == task_expected_state
@@ -536,6 +558,82 @@ class TestKubernetesExecutor:
finally:
kubernetes_executor.end()
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason="kubernetes python package
is not installed"
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+ def test_skip_pod_creation_on_create_pods_after(
+ self,
+ mock_get_kube_client,
+ mock_kubernetes_job_watcher,
+ data_file,
+ ):
+ """
+ Skip pod creation when create_pods_after is in the future
+ """
+
+ template_file =
data_file("pods/generator_base_with_secrets.yaml").as_posix()
+ # A mock kube_client that throws errors when making a pod
+ mock_kube_client = mock.patch("kubernetes.client.CoreV1Api",
autospec=True)
+ mock_kube_client.create_namespaced_pod = mock.MagicMock( # type:
ignore[attr-defined]
+ side_effect=ApiException(
+ http_resp=HTTPResponse(
+ body="Too many requests, please try again later.",
+ status=429,
+ headers={"Retry-After": "999999"},
+ ),
+ )
+ )
+ mock_get_kube_client.return_value = mock_kube_client
+ mock_api_client = mock.MagicMock()
+ mock_api_client.sanitize_for_serialization.return_value = {}
+ mock_kube_client.api_client = mock_api_client # type:
ignore[attr-defined]
+ config = {
+ ("kubernetes_executor", "pod_template_file"): template_file,
+ }
+ with conf_vars(config):
+ kubernetes_executor = self.kubernetes_executor
+ kubernetes_executor.task_publish_max_retries = 1
+ kubernetes_executor.start()
+ try:
+ try_number = 1
+ task_instance_key = TaskInstanceKey("dag", "task", "run_id",
try_number)
+ kubernetes_executor.execute_async(
+ key=task_instance_key,
+ queue=None,
+ command=["airflow", "tasks", "run", "true",
"some_parameter"],
+ )
+ kubernetes_executor.sync()
+
+ # There should be only one request to create pod which fails
+ assert mock_kube_client.create_namespaced_pod.call_count == 1
# type: ignore[attr-defined]
+ # The task is queued
+ assert not kubernetes_executor.task_queue.empty()
+
+ # sync multiple times to make sure that no other pod is trying
to be created
+ kubernetes_executor.sync()
+ kubernetes_executor.sync()
+ kubernetes_executor.sync()
+
+ assert not kubernetes_executor.task_queue.empty()
+ assert mock_kube_client.create_namespaced_pod.call_count == 1
# type: ignore[attr-defined]
+
+ kubernetes_executor.create_pods_after = datetime.now() -
timedelta(hours=1)
+
+ mock_kube_client.create_namespaced_pod.side_effect = None
+ mock_kube_client.create_namespaced_pod.reset_mock()
+
+ kubernetes_executor.sync()
+
+ assert mock_kube_client.create_namespaced_pod.call_count == 1
+
+ assert kubernetes_executor.task_queue.empty()
+ assert kubernetes_executor.event_buffer[task_instance_key][0]
== State.QUEUED
+
+ finally:
+ kubernetes_executor.end()
+
@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package
is not installed"
)
@@ -595,6 +693,7 @@ class TestKubernetesExecutor:
mock_api_client = mock.MagicMock()
mock_api_client.sanitize_for_serialization.return_value = {}
mock_kube_client.api_client = mock_api_client
+
config = {("kubernetes_executor", "pod_template_file"): template_file}
with conf_vars(config):
kubernetes_executor = self.kubernetes_executor