This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 220fb39d603 [v3-0-test] Unify connection not found exceptions between 
AF2 and AF3 (#52968) (#53093)
220fb39d603 is described below

commit 220fb39d6032a040efab04f21e1541902dcbc007
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Jul 11 11:47:07 2025 +0530

    [v3-0-test] Unify connection not found exceptions between AF2 and AF3 
(#52968) (#53093)
---
 airflow-core/src/airflow/models/connection.py            |  3 +--
 airflow-core/tests/unit/dag_processing/test_processor.py |  2 +-
 task-sdk/src/airflow/sdk/definitions/connection.py       | 10 ++++++++--
 task-sdk/src/airflow/sdk/execution_time/context.py       |  4 +++-
 task-sdk/tests/task_sdk/definitions/test_connections.py  | 12 ++++++++++--
 5 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/models/connection.py 
b/airflow-core/src/airflow/models/connection.py
index 2727c926e0f..9205083c9d3 100644
--- a/airflow-core/src/airflow/models/connection.py
+++ b/airflow-core/src/airflow/models/connection.py
@@ -478,8 +478,7 @@ class Connection(Base, LoggingMixin):
                 return conn
             except AirflowRuntimeError as e:
                 if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
-                    log.debug("Unable to retrieve connection from 
MetastoreBackend using Task SDK")
-                    raise AirflowNotFoundException(f"The conn_id `{conn_id}` 
isn't defined")
+                    raise AirflowNotFoundException(f"The conn_id `{conn_id}` 
isn't defined") from None
                 raise
 
         # check cache first
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 71b601dfdd8..1649cb63896 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -324,7 +324,7 @@ class TestDagFileProcessor:
         assert result is not None
         assert result.import_errors != {}
         if result.import_errors:
-            assert "CONNECTION_NOT_FOUND" in 
next(iter(result.import_errors.values()))
+            assert "The conn_id `my_conn` isn't defined" in 
next(iter(result.import_errors.values()))
 
     def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path, 
inprocess_client):
         tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")
diff --git a/task-sdk/src/airflow/sdk/definitions/connection.py 
b/task-sdk/src/airflow/sdk/definitions/connection.py
index c66b264dce0..837a1d4e25d 100644
--- a/task-sdk/src/airflow/sdk/definitions/connection.py
+++ b/task-sdk/src/airflow/sdk/definitions/connection.py
@@ -24,7 +24,8 @@ from typing import Any
 
 import attrs
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowNotFoundException
+from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
 
 log = logging.getLogger(__name__)
 
@@ -139,7 +140,12 @@ class Connection:
     def get(cls, conn_id: str) -> Any:
         from airflow.sdk.execution_time.context import _get_connection
 
-        return _get_connection(conn_id)
+        try:
+            return _get_connection(conn_id)
+        except AirflowRuntimeError as e:
+            if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
+                raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't 
defined") from None
+            raise
 
     @property
     def extra_dejson(self) -> dict:
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py 
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 744b3b08886..9ee2e03c9b8 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -272,7 +272,9 @@ class ConnectionAccessor:
     """Wrapper to access Connection entries in template."""
 
     def __getattr__(self, conn_id: str) -> Any:
-        return _get_connection(conn_id)
+        from airflow.sdk.definitions.connection import Connection
+
+        return Connection.get(conn_id)
 
     def __repr__(self) -> str:
         return "<ConnectionAccessor (dynamic access)>"
diff --git a/task-sdk/tests/task_sdk/definitions/test_connections.py 
b/task-sdk/tests/task_sdk/definitions/test_connections.py
index 3bbb63a7697..6e4d977c659 100644
--- a/task-sdk/tests/task_sdk/definitions/test_connections.py
+++ b/task-sdk/tests/task_sdk/definitions/test_connections.py
@@ -23,9 +23,10 @@ from urllib.parse import urlparse
 import pytest
 
 from airflow.configuration import initialize_secrets_backends
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowNotFoundException
 from airflow.sdk import Connection
-from airflow.sdk.execution_time.comms import ConnectionResult
+from airflow.sdk.exceptions import ErrorType
+from airflow.sdk.execution_time.comms import ConnectionResult, ErrorResponse
 from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
 
 from tests_common.test_utils.config import conf_vars
@@ -121,6 +122,13 @@ class TestConnections:
             extra=None,
         )
 
+    def test_conn_get_not_found(self, mock_supervisor_comms):
+        error_response = ErrorResponse(error=ErrorType.CONNECTION_NOT_FOUND)
+        mock_supervisor_comms.send.return_value = error_response
+
+        with pytest.raises(AirflowNotFoundException, match="The conn_id 
`mysql_conn` isn't defined"):
+            _ = Connection.get(conn_id="mysql_conn")
+
 
 class TestConnectionsFromSecrets:
     def test_get_connection_secrets_backend(self, mock_supervisor_comms, 
tmp_path):

Reply via email to