This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e9e6ac27a4b Handle invalid execution API urls gracefully in supervisor
(#53082)
e9e6ac27a4b is described below
commit e9e6ac27a4ba434fefc024b0b6430fb319c609e5
Author: Amogh Desai <[email protected]>
AuthorDate: Sat Jul 19 10:58:00 2025 +0530
Handle invalid execution API urls gracefully in supervisor (#53082)
---
.../src/airflow/sdk/execution_time/supervisor.py | 35 +++++++++++++-
task-sdk/tests/conftest.py | 10 ++++
.../task_sdk/execution_time/test_supervisor.py | 53 ++++++++++++++++++++++
3 files changed, 96 insertions(+), 2 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 798cdae560d..e36b3667e2e 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -42,6 +42,7 @@ from typing import (
TextIO,
cast,
)
+from urllib.parse import urlparse
from uuid import UUID
import attrs
@@ -1650,12 +1651,42 @@ def supervise(
:param subprocess_logs_to_stdout: Should task logs also be sent to stdout
via the main logger.
:param client: Optional preconfigured client for communication with the
server (Mostly for tests).
:return: Exit code of the process.
+ :raises ValueError: If server URL is empty or invalid.
"""
# One or the other
from airflow.sdk.execution_time.secrets_masker import reset_secrets_masker
- if not client and ((not server) ^ dry_run):
- raise ValueError(f"Can only specify one of {server=} or {dry_run=}")
+ if not client:
+ if dry_run and server:
+ raise ValueError(f"Can only specify one of {server=} or
{dry_run=}")
+
+ if not dry_run:
+ if not server:
+ raise ValueError(
+ "Invalid execution API server URL. Please ensure that a
valid URL is configured."
+ )
+
+ try:
+ parsed_url = urlparse(server)
+ except Exception as e:
+ raise ValueError(
+ f"Invalid execution API server URL '{server}': {e}. "
+ "Please ensure that a valid URL is configured."
+ ) from e
+
+ if parsed_url.scheme not in ("http", "https"):
+ raise ValueError(
+ f"Invalid execution API server URL '{server}': "
+ "URL must use http:// or https:// scheme. "
+ "Please ensure that a valid URL is configured."
+ )
+
+ if not parsed_url.netloc:
+ raise ValueError(
+ f"Invalid execution API server URL '{server}': "
+ "URL must include a valid host. "
+ "Please ensure that a valid URL is configured."
+ )
if not dag_rel_path:
raise ValueError("dag_path is required")
diff --git a/task-sdk/tests/conftest.py b/task-sdk/tests/conftest.py
index 80c71f41a8f..d1866f52391 100644
--- a/task-sdk/tests/conftest.py
+++ b/task-sdk/tests/conftest.py
@@ -20,6 +20,7 @@ import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any, NoReturn, Protocol
+from unittest.mock import patch
import pytest
@@ -271,3 +272,12 @@ def make_ti_context_dict(make_ti_context:
MakeTIContextCallable) -> MakeTIContex
return context.model_dump(exclude_unset=True, mode="json")
return _make_context_dict
+
+
[email protected]
+def patched_secrets_masker():
+ from airflow.sdk.execution_time.secrets_masker import SecretsMasker
+
+ secrets_masker = SecretsMasker()
+ with patch("airflow.sdk.execution_time.secrets_masker._secrets_masker",
return_value=secrets_masker):
+ yield secrets_masker
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index e78bb206982..d1b75f1ed22 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -27,6 +27,7 @@ import signal
import socket
import sys
import time
+from contextlib import nullcontext
from operator import attrgetter
from random import randint
from time import sleep
@@ -149,6 +150,58 @@ def client_with_ti_start(make_ti_context):
return client
[email protected]("disable_capturing")
+class TestSupervisor:
+ @pytest.mark.parametrize(
+ "server, dry_run, expectation",
+ [
+ ("/execution/", False, pytest.raises(ValueError, match="Invalid
execution API server URL")),
+ ("", False, pytest.raises(ValueError, match="Invalid execution API
server URL")),
+ ("http://localhost:8080", True, pytest.raises(ValueError,
match="Can only specify one of")),
+ (None, True, nullcontext()),
+ ("http://localhost:8080/execution/", False, nullcontext()),
+ ("https://localhost:8080/execution/", False, nullcontext()),
+ ],
+ )
+ def test_supervise(
+ self,
+ patched_secrets_masker,
+ server,
+ dry_run,
+ expectation,
+ test_dags_dir,
+ client_with_ti_start,
+ ):
+ """
+ Test that the supervisor validates server URL and dry_run parameter
combinations correctly.
+ """
+ ti = TaskInstance(
+ id=uuid7(),
+ task_id="async",
+ dag_id="super_basic_deferred_run",
+ run_id="d",
+ try_number=1,
+ dag_version_id=uuid7(),
+ )
+
+ bundle_info = BundleInfo(name="my-bundle", version=None)
+
+ kw = {
+ "ti": ti,
+ "dag_rel_path": "super_basic_deferred_run.py",
+ "token": "",
+ "bundle_info": bundle_info,
+ "dry_run": dry_run,
+ "server": server,
+ }
+ if isinstance(expectation, nullcontext):
+ kw["client"] = client_with_ti_start
+
+ with patch.dict(os.environ, local_dag_bundle_cfg(test_dags_dir,
bundle_info.name)):
+ with expectation:
+ supervise(**kw)
+
+
@pytest.mark.usefixtures("disable_capturing")
class TestWatchedSubprocess:
@pytest.fixture(autouse=True)