This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new a5c1b6b7f80 Prevent Session from staying opened between yields 
(#65179) (#65195)
a5c1b6b7f80 is described below

commit a5c1b6b7f804256bdcf180ba253675a2aae9ee61
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Apr 14 11:44:32 2026 +0200

    Prevent Session from staying opened between yields (#65179) (#65195)
---
 .../airflow/api_fastapi/core_api/routes/ui/grid.py | 52 ++++++++++++----------
 .../tests/unit/api_fastapi/core_api/test_app.py    | 10 +++--
 2 files changed, 36 insertions(+), 26 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index bea583d5101..ecc4f8e4655 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -25,10 +25,10 @@ import structlog
 from fastapi import Depends, HTTPException, Query, status
 from fastapi.responses import StreamingResponse
 from sqlalchemy import exists, select
-from sqlalchemy.orm import Session, joinedload, load_only, selectinload
+from sqlalchemy.orm import joinedload, load_only, selectinload
 
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
-from airflow.api_fastapi.common.db.common import SessionDep, _get_session, 
paginated_select
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
 from airflow.api_fastapi.common.parameters import (
     QueryDagRunRunTypesFilter,
     QueryDagRunStateFilter,
@@ -67,6 +67,7 @@ from airflow.models.deadline import Deadline
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskinstancehistory import TaskInstanceHistory
+from airflow.utils.session import create_session
 
 log = structlog.get_logger(logger_name=__name__)
 grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
@@ -426,7 +427,6 @@ def _build_ti_summaries(
 )
 def get_grid_ti_summaries_stream(
     dag_id: str,
-    session: Annotated[Session, Depends(_get_session)],
     run_ids: Annotated[list[str] | None, Query()] = None,
 ) -> StreamingResponse:
     """
@@ -441,28 +441,34 @@ def get_grid_ti_summaries_stream(
     """
 
     def _generate() -> Generator[str, None, None]:
+
+        # Each iteration opens and closes its own DB session so the connection 
is
+        # released between yields.  This prevents a slow client from holding a
+        # database connection open for the entire stream duration.
+        # See https://github.com/apache/airflow/issues/65010.
         serdag_cache: dict = {}
         for run_id in run_ids or []:
-            tis = session.execute(
-                select(
-                    TaskInstance.task_id,
-                    TaskInstance.state,
-                    TaskInstance.dag_version_id,
-                    TaskInstance.start_date,
-                    TaskInstance.end_date,
-                    DagVersion.version_number,
-                )
-                .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
-                .where(TaskInstance.dag_id == dag_id)
-                .where(TaskInstance.run_id == run_id)
-                .order_by(TaskInstance.task_id)
-            ).all()
-            if not tis:
-                continue
-            version_id = tis[0].dag_version_id
-            if version_id not in serdag_cache:
-                serdag_cache[version_id] = _get_serdag(dag_id, version_id, 
session)
-            summary = _build_ti_summaries(dag_id, run_id, tis, session, 
serdag=serdag_cache[version_id])
+            with create_session(scoped=False) as session:
+                tis = session.execute(
+                    select(
+                        TaskInstance.task_id,
+                        TaskInstance.state,
+                        TaskInstance.dag_version_id,
+                        TaskInstance.start_date,
+                        TaskInstance.end_date,
+                        DagVersion.version_number,
+                    )
+                    .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
+                    .where(TaskInstance.dag_id == dag_id)
+                    .where(TaskInstance.run_id == run_id)
+                    .order_by(TaskInstance.task_id)
+                ).all()
+                if not tis:
+                    continue
+                version_id = tis[0].dag_version_id
+                if version_id not in serdag_cache:
+                    serdag_cache[version_id] = _get_serdag(dag_id, version_id, 
session)
+                summary = _build_ti_summaries(dag_id, run_id, tis, session, 
serdag=serdag_cache[version_id])
             yield GridTISummaries.model_validate(summary).model_dump_json() + 
"\n"
 
     return StreamingResponse(content=_generate(), 
media_type="application/x-ndjson")
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py 
b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
index 75e44245b39..a30c803f50c 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
@@ -93,9 +93,13 @@ class TestStreamingEndpointSessionScope:
 
         assert not violations, (
             "Streaming endpoints must not use function-scoped dependencies 
like "
-            "SessionDep.  Use Annotated[Session, Depends(_get_session)] 
(default "
-            "request scope) instead — function-scoped cleanup runs before the "
-            "response body is streamed, leaking database connections.\n"
+            "SessionDep — function-scoped cleanup runs before the response 
body "
+            "is streamed, leaking database connections.\n"
+            "Do NOT use Annotated[Session, Depends(_get_session)] or other 
session dependencies "
+            "either, as this holds the DB connection open for the entire 
stream "
+            "duration.\n"
+            "Instead, use create_session() inside the generator to open/close 
a "
+            "connection for each iteration, releasing it between yields.\n"
             + "\n".join(f"  - {v}" for v in violations)
         )
 

Reply via email to