This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new a5c1b6b7f80 Prevent Session from staying opened between yields
(#65179) (#65195)
a5c1b6b7f80 is described below
commit a5c1b6b7f804256bdcf180ba253675a2aae9ee61
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Apr 14 11:44:32 2026 +0200
Prevent Session from staying opened between yields (#65179) (#65195)
---
.../airflow/api_fastapi/core_api/routes/ui/grid.py | 52 ++++++++++++----------
.../tests/unit/api_fastapi/core_api/test_app.py | 10 +++--
2 files changed, 36 insertions(+), 26 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index bea583d5101..ecc4f8e4655 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -25,10 +25,10 @@ import structlog
from fastapi import Depends, HTTPException, Query, status
from fastapi.responses import StreamingResponse
from sqlalchemy import exists, select
-from sqlalchemy.orm import Session, joinedload, load_only, selectinload
+from sqlalchemy.orm import joinedload, load_only, selectinload
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
-from airflow.api_fastapi.common.db.common import SessionDep, _get_session,
paginated_select
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import (
QueryDagRunRunTypesFilter,
QueryDagRunStateFilter,
@@ -67,6 +67,7 @@ from airflow.models.deadline import Deadline
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
+from airflow.utils.session import create_session
log = structlog.get_logger(logger_name=__name__)
grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
@@ -426,7 +427,6 @@ def _build_ti_summaries(
)
def get_grid_ti_summaries_stream(
dag_id: str,
- session: Annotated[Session, Depends(_get_session)],
run_ids: Annotated[list[str] | None, Query()] = None,
) -> StreamingResponse:
"""
@@ -441,28 +441,34 @@ def get_grid_ti_summaries_stream(
"""
def _generate() -> Generator[str, None, None]:
+
+ # Each iteration opens and closes its own DB session so the connection
is
+ # released between yields. This prevents a slow client from holding a
+ # database connection open for the entire stream duration.
+ # See https://github.com/apache/airflow/issues/65010.
serdag_cache: dict = {}
for run_id in run_ids or []:
- tis = session.execute(
- select(
- TaskInstance.task_id,
- TaskInstance.state,
- TaskInstance.dag_version_id,
- TaskInstance.start_date,
- TaskInstance.end_date,
- DagVersion.version_number,
- )
- .outerjoin(DagVersion, TaskInstance.dag_version_id ==
DagVersion.id)
- .where(TaskInstance.dag_id == dag_id)
- .where(TaskInstance.run_id == run_id)
- .order_by(TaskInstance.task_id)
- ).all()
- if not tis:
- continue
- version_id = tis[0].dag_version_id
- if version_id not in serdag_cache:
- serdag_cache[version_id] = _get_serdag(dag_id, version_id,
session)
- summary = _build_ti_summaries(dag_id, run_id, tis, session,
serdag=serdag_cache[version_id])
+ with create_session(scoped=False) as session:
+ tis = session.execute(
+ select(
+ TaskInstance.task_id,
+ TaskInstance.state,
+ TaskInstance.dag_version_id,
+ TaskInstance.start_date,
+ TaskInstance.end_date,
+ DagVersion.version_number,
+ )
+ .outerjoin(DagVersion, TaskInstance.dag_version_id ==
DagVersion.id)
+ .where(TaskInstance.dag_id == dag_id)
+ .where(TaskInstance.run_id == run_id)
+ .order_by(TaskInstance.task_id)
+ ).all()
+ if not tis:
+ continue
+ version_id = tis[0].dag_version_id
+ if version_id not in serdag_cache:
+ serdag_cache[version_id] = _get_serdag(dag_id, version_id,
session)
+ summary = _build_ti_summaries(dag_id, run_id, tis, session,
serdag=serdag_cache[version_id])
yield GridTISummaries.model_validate(summary).model_dump_json() +
"\n"
return StreamingResponse(content=_generate(),
media_type="application/x-ndjson")
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
index 75e44245b39..a30c803f50c 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
@@ -93,9 +93,13 @@ class TestStreamingEndpointSessionScope:
assert not violations, (
"Streaming endpoints must not use function-scoped dependencies
like "
- "SessionDep. Use Annotated[Session, Depends(_get_session)]
(default "
- "request scope) instead — function-scoped cleanup runs before the "
- "response body is streamed, leaking database connections.\n"
+ "SessionDep — function-scoped cleanup runs before the response
body "
+ "is streamed, leaking database connections.\n"
+ "Do NOT use Annotated[Session, Depends(_get_session)] or other
session dependencies "
+ "either, as this holds the DB connection open for the entire
stream "
+ "duration.\n"
+ "Instead, use create_session() inside the generator to open/close
a "
+ "connection for each iteration, releasing it between yields.\n"
+ "\n".join(f" - {v}" for v in violations)
)