This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new 220fb39d603 [v3-0-test] Unify connection not found exceptions between
AF2 and AF3 (#52968) (#53093)
220fb39d603 is described below
commit 220fb39d6032a040efab04f21e1541902dcbc007
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Jul 11 11:47:07 2025 +0530
[v3-0-test] Unify connection not found exceptions between AF2 and AF3
(#52968) (#53093)
---
airflow-core/src/airflow/models/connection.py | 3 +--
airflow-core/tests/unit/dag_processing/test_processor.py | 2 +-
task-sdk/src/airflow/sdk/definitions/connection.py | 10 ++++++++--
task-sdk/src/airflow/sdk/execution_time/context.py | 4 +++-
task-sdk/tests/task_sdk/definitions/test_connections.py | 12 ++++++++++--
5 files changed, 23 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/models/connection.py
b/airflow-core/src/airflow/models/connection.py
index 2727c926e0f..9205083c9d3 100644
--- a/airflow-core/src/airflow/models/connection.py
+++ b/airflow-core/src/airflow/models/connection.py
@@ -478,8 +478,7 @@ class Connection(Base, LoggingMixin):
return conn
except AirflowRuntimeError as e:
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
- log.debug("Unable to retrieve connection from
MetastoreBackend using Task SDK")
- raise AirflowNotFoundException(f"The conn_id `{conn_id}`
isn't defined")
+ raise AirflowNotFoundException(f"The conn_id `{conn_id}`
isn't defined") from None
raise
# check cache first
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 71b601dfdd8..1649cb63896 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -324,7 +324,7 @@ class TestDagFileProcessor:
assert result is not None
assert result.import_errors != {}
if result.import_errors:
- assert "CONNECTION_NOT_FOUND" in
next(iter(result.import_errors.values()))
+ assert "The conn_id `my_conn` isn't defined" in
next(iter(result.import_errors.values()))
def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path,
inprocess_client):
tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")
diff --git a/task-sdk/src/airflow/sdk/definitions/connection.py
b/task-sdk/src/airflow/sdk/definitions/connection.py
index c66b264dce0..837a1d4e25d 100644
--- a/task-sdk/src/airflow/sdk/definitions/connection.py
+++ b/task-sdk/src/airflow/sdk/definitions/connection.py
@@ -24,7 +24,8 @@ from typing import Any
import attrs
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowNotFoundException
+from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
log = logging.getLogger(__name__)
@@ -139,7 +140,12 @@ class Connection:
def get(cls, conn_id: str) -> Any:
from airflow.sdk.execution_time.context import _get_connection
- return _get_connection(conn_id)
+ try:
+ return _get_connection(conn_id)
+ except AirflowRuntimeError as e:
+ if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
+ raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't
defined") from None
+ raise
@property
def extra_dejson(self) -> dict:
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 744b3b08886..9ee2e03c9b8 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -272,7 +272,9 @@ class ConnectionAccessor:
"""Wrapper to access Connection entries in template."""
def __getattr__(self, conn_id: str) -> Any:
- return _get_connection(conn_id)
+ from airflow.sdk.definitions.connection import Connection
+
+ return Connection.get(conn_id)
def __repr__(self) -> str:
return "<ConnectionAccessor (dynamic access)>"
diff --git a/task-sdk/tests/task_sdk/definitions/test_connections.py
b/task-sdk/tests/task_sdk/definitions/test_connections.py
index 3bbb63a7697..6e4d977c659 100644
--- a/task-sdk/tests/task_sdk/definitions/test_connections.py
+++ b/task-sdk/tests/task_sdk/definitions/test_connections.py
@@ -23,9 +23,10 @@ from urllib.parse import urlparse
import pytest
from airflow.configuration import initialize_secrets_backends
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.sdk import Connection
-from airflow.sdk.execution_time.comms import ConnectionResult
+from airflow.sdk.exceptions import ErrorType
+from airflow.sdk.execution_time.comms import ConnectionResult, ErrorResponse
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
from tests_common.test_utils.config import conf_vars
@@ -121,6 +122,13 @@ class TestConnections:
extra=None,
)
+ def test_conn_get_not_found(self, mock_supervisor_comms):
+ error_response = ErrorResponse(error=ErrorType.CONNECTION_NOT_FOUND)
+ mock_supervisor_comms.send.return_value = error_response
+
+ with pytest.raises(AirflowNotFoundException, match="The conn_id
`mysql_conn` isn't defined"):
+ _ = Connection.get(conn_id="mysql_conn")
+
class TestConnectionsFromSecrets:
def test_get_connection_secrets_backend(self, mock_supervisor_comms,
tmp_path):