This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 364547048fd302e79c91b29ac68db18258187415 Author: Wei Lee <[email protected]> AuthorDate: Sat Sep 13 07:13:41 2025 +0800 fix(hitl): check whether task instance is deferring when fetching pending actions (#55539) (cherry picked from commit 81cfe3db827b093c413e8d2be9a1e74d1b2e7fb5) --- airflow-core/src/airflow/api_fastapi/common/parameters.py | 5 ++++- airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py | 6 +++++- airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py | 3 ++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index f1d876f29b6..7a555454cd1 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -718,7 +718,10 @@ class _PendingActionsFilter(BaseParam[bool]): pending_actions_count_subquery = ( sql_select(func.count(HITLDetail.ti_id)) .join(TaskInstance, HITLDetail.ti_id == TaskInstance.id) - .where(HITLDetail.response_at.is_(None)) + .where( + HITLDetail.response_at.is_(None), + TaskInstance.state == TaskInstanceState.DEFERRED, + ) .where(TaskInstance.dag_id == DagModel.dag_id) .scalar_subquery() ) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py index e953017a56b..9cd5698800a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -67,6 +67,7 @@ from airflow.api_fastapi.core_api.security import ( from airflow.models import DagModel, DagRun from airflow.models.hitl import HITLDetail from airflow.models.taskinstance import TaskInstance +from airflow.utils.state import TaskInstanceState dags_router = AirflowRouter(prefix="/dags", tags=["DAG"]) @@ -201,7 +202,10 @@ def get_dags( HITLDetail, ) .join(TaskInstance, HITLDetail.ti_id == TaskInstance.id) - .where(HITLDetail.response_at.is_(None)) + .where( + HITLDetail.response_at.is_(None), + TaskInstance.state == TaskInstanceState.DEFERRED, + ) .where(TaskInstance.dag_id.in_([dag.dag_id for dag in dags])) .order_by(TaskInstance.dag_id) ) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py index f85d7bda49b..ea1f6293151 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py @@ -29,7 +29,7 @@ from airflow.models import DagRun from airflow.models.hitl import HITLDetail from airflow.sdk.timezone import utcnow from airflow.utils.session import provide_session -from airflow.utils.state import DagRunState +from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunTriggeredByType, DagRunType from unit.api_fastapi.core_api.routes.public.test_dags import ( @@ -136,6 +136,7 @@ class TestGetDagRuns(TestPublicDagEndpoint): run_id=f"hitl_run_{ti_i}", task_id=f"test_task_{ti_i}", session=session, + state=TaskInstanceState.DEFERRED, ) for ti_i in range(TI_COUNT) ]
