This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch backport-65162
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3ac48ee9991e5594fdb0a898b61faa80168341f5
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Mon Apr 13 19:45:23 2026 +0100

    Prevent Session leak from StreamingResponse API endpoints. (#65162)
    
    The comment in the test sums up what was going on, but essentially this 
result
    in `session.close` being called when the endpoint returned, but before the
    response was generated.
    
    Then SQLA was 'helpful' and re-opened it. However since the
    teardown/post-yield code has already run, this new session is never closed.
    
    This results in a connection open that leaks untli Python's full GC runs 
(i.e.
    `gc.collect()`) -- it going out of scope doesn't delete things as the
    Session and the RootTransaction form a cycle.
    
    (cherry picked from commit 5559365a4b016817fad3d16ec710b716f1ddd945)
---
 .../airflow/api_fastapi/core_api/routes/ui/grid.py |  6 +-
 .../tests/unit/api_fastapi/core_api/test_app.py    | 77 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 0143ae81e14..bea583d5101 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -25,10 +25,10 @@ import structlog
 from fastapi import Depends, HTTPException, Query, status
 from fastapi.responses import StreamingResponse
 from sqlalchemy import exists, select
-from sqlalchemy.orm import joinedload, load_only, selectinload
+from sqlalchemy.orm import Session, joinedload, load_only, selectinload
 
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
-from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.db.common import SessionDep, _get_session, 
paginated_select
 from airflow.api_fastapi.common.parameters import (
     QueryDagRunRunTypesFilter,
     QueryDagRunStateFilter,
@@ -426,7 +426,7 @@ def _build_ti_summaries(
 )
 def get_grid_ti_summaries_stream(
     dag_id: str,
-    session: SessionDep,
+    session: Annotated[Session, Depends(_get_session)],
     run_ids: Annotated[list[str] | None, Query()] = None,
 ) -> StreamingResponse:
     """
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py 
b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
index 4881976a673..75e44245b39 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
@@ -16,13 +16,90 @@
 # under the License.
 from __future__ import annotations
 
+import inspect
+import typing
+
 import pytest
+from fastapi.params import Depends as DependsClass
+from fastapi.responses import StreamingResponse
+from starlette.routing import Mount
+
+from airflow.api_fastapi.app import create_app
 
 from tests_common.test_utils.db import clear_db_jobs
 
 pytestmark = pytest.mark.db_test
 
 
+def _get_all_api_routes(app):
+    """Recursively yield all APIRoutes from the app and its mounted 
sub-apps."""
+    for route in getattr(app, "routes", []):
+        if isinstance(route, Mount) and hasattr(route, "app"):
+            yield from _get_all_api_routes(route.app)
+        if hasattr(route, "endpoint"):
+            yield route
+
+
+class TestStreamingEndpointSessionScope:
+    def test_no_streaming_endpoint_uses_function_scoped_depends(self):
+        """Streaming endpoints must not use function-scoped generator 
dependencies.
+
+        FastAPI's ``function_stack`` (used for ``scope="function"`` 
dependencies)
+        is torn down after the route handler returns but *before* the response 
body
+        is sent.  For ``StreamingResponse`` endpoints the response body is 
produced
+        by a generator that runs during sending, so any generator dependency 
with
+        ``scope="function"`` will have its cleanup run before the generator
+        executes.  This causes the generator to silently reopen the session via
+        autobegin, and the resulting connection is never returned to the pool.
+        """
+        # These endpoints mention StreamingResponse but only use the session
+        # *before* streaming begins — the generator does not capture it.
+        # Function scope is correct for them: close the session early rather
+        # than hold it open for the entire (potentially long) stream.
+        allowed = {
+            "airflow.api_fastapi.core_api.routes.public.log.get_log",
+            
"airflow.api_fastapi.core_api.routes.public.dag_run.wait_dag_run_until_finished",
+        }
+
+        app = create_app()
+        violations = []
+        for route in _get_all_api_routes(app):
+            try:
+                hints = typing.get_type_hints(route.endpoint, 
include_extras=True)
+            except Exception:
+                continue
+            returns_streaming = hints.get("return") is StreamingResponse
+            if not returns_streaming:
+                try:
+                    returns_streaming = "StreamingResponse" in 
inspect.getsource(route.endpoint)
+                except (OSError, TypeError):
+                    pass
+            if not returns_streaming:
+                continue
+            fqn = f"{route.endpoint.__module__}.{route.endpoint.__qualname__}"
+            if fqn in allowed:
+                continue
+            for param_name, hint in hints.items():
+                if param_name == "return":
+                    continue
+                if typing.get_origin(hint) is not typing.Annotated:
+                    continue
+                for metadata in typing.get_args(hint)[1:]:
+                    if isinstance(metadata, DependsClass) and metadata.scope 
== "function":
+                        violations.append(
+                            
f"{route.endpoint.__module__}.{route.endpoint.__qualname__}"
+                            f" parameter '{param_name}'"
+                        )
+
+        assert not violations, (
+            "Streaming endpoints must not use function-scoped dependencies 
like "
+            "SessionDep.  Use Annotated[Session, Depends(_get_session)] 
(default "
+            "request scope) instead — function-scoped cleanup runs before the "
+            "response body is streamed, leaking database connections.\n"
+            + "\n".join(f"  - {v}" for v in violations)
+        )
+
+
 class TestGzipMiddleware:
     @pytest.fixture(autouse=True)
     def setup(self):

Reply via email to