This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ee652646f0 Remove last usages of grid data and remove grid_data
endpoint. (#52672)
7ee652646f0 is described below
commit 7ee652646f0b76c79d1ad4aedf994a93537d3a80
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Jul 9 10:48:54 2025 -0700
Remove last usages of grid data and remove grid_data endpoint. (#52672)
Get rid of the old grid_data endpoint. It was already replaced in the grid
view. But there were still a few uses in the graph view, namely for tooltips.
---
.../api_fastapi/core_api/datamodels/ui/grid.py | 42 +-
.../api_fastapi/core_api/openapi/_private_ui.yaml | 283 -------
.../airflow/api_fastapi/core_api/routes/ui/grid.py | 210 -----
.../api_fastapi/core_api/services/ui/grid.py | 243 +-----
.../src/airflow/ui/openapi-gen/queries/common.ts | 18 -
.../ui/openapi-gen/queries/ensureQueryData.ts | 35 -
.../src/airflow/ui/openapi-gen/queries/prefetch.ts | 35 -
.../src/airflow/ui/openapi-gen/queries/queries.ts | 35 -
.../src/airflow/ui/openapi-gen/queries/suspense.ts | 35 -
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 222 -----
.../ui/openapi-gen/requests/services.gen.ts | 51 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 83 --
.../airflow/ui/src/components/Graph/TaskNode.tsx | 12 +-
.../ui/src/components/Graph/reactflowUtils.ts | 4 +-
.../ui/src/components/TaskInstanceTooltip.tsx | 34 +-
.../ui/src/layouts/Details/DetailsLayout.tsx | 2 -
.../airflow/ui/src/layouts/Details/Graph/Graph.tsx | 39 +-
.../airflow/ui/src/layouts/Details/Grid/utils.ts | 6 +-
.../ui/src/pages/Asset/CreateAssetEventModal.tsx | 2 -
airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx | 7 +-
airflow-core/src/airflow/ui/src/pages/Run/Run.tsx | 12 +-
.../src/airflow/ui/src/pages/Task/Task.tsx | 17 +-
.../ui/src/pages/TaskInstance/TaskInstance.tsx | 49 +-
.../src/airflow/ui/src/queries/useClearRun.ts | 2 -
.../ui/src/queries/useClearTaskInstances.ts | 2 -
.../src/airflow/ui/src/queries/useDeleteDagRun.ts | 7 +-
.../ui/src/queries/useDeleteTaskInstance.ts | 2 -
.../airflow/ui/src/queries/useGridTISummaries.ts | 1 +
.../src/airflow/ui/src/queries/usePatchDagRun.ts | 2 -
.../airflow/ui/src/queries/usePatchTaskInstance.ts | 2 -
.../ui/src/queries/useRefreshOnNewDagRuns.ts | 2 -
.../src/airflow/ui/src/queries/useTrigger.ts | 2 -
.../api_fastapi/core_api/routes/ui/test_grid.py | 925 +++------------------
33 files changed, 165 insertions(+), 2258 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
index 6a00f1088cf..b523dce96ff 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py
@@ -19,24 +19,9 @@ from __future__ import annotations
from datetime import datetime
-from pydantic import BaseModel, Field
+from pydantic import BaseModel
-from airflow.utils.state import DagRunState, TaskInstanceState
-from airflow.utils.types import DagRunType
-
-
-class GridTaskInstanceSummary(BaseModel):
- """Task Instance Summary model for the Grid UI."""
-
- task_id: str
- try_number: int
- start_date: datetime | None
- end_date: datetime | None
- queued_dttm: datetime | None
- child_states: dict[str, int] | None
- task_count: int
- state: TaskInstanceState | None
- note: str | None
+from airflow.utils.state import TaskInstanceState
class LightGridTaskInstanceSummary(BaseModel):
@@ -49,32 +34,9 @@ class LightGridTaskInstanceSummary(BaseModel):
max_end_date: datetime | None
-class GridDAGRunwithTIs(BaseModel):
- """DAG Run model for the Grid UI."""
-
- run_id: str = Field(serialization_alias="dag_run_id",
validation_alias="run_id")
- queued_at: datetime | None
- start_date: datetime | None
- end_date: datetime | None
- run_after: datetime
- state: DagRunState
- run_type: DagRunType
- logical_date: datetime | None
- data_interval_start: datetime | None
- data_interval_end: datetime | None
- note: str | None
- task_instances: list[GridTaskInstanceSummary]
-
-
class GridTISummaries(BaseModel):
"""DAG Run model for the Grid UI."""
run_id: str
dag_id: str
task_instances: list[LightGridTaskInstanceSummary]
-
-
-class GridResponse(BaseModel):
- """Response model for the Grid UI."""
-
- dag_runs: list[GridDAGRunwithTIs]
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index cb48bdad87b..3c7a4abae7c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -474,144 +474,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /ui/grid/{dag_id}:
- get:
- tags:
- - Grid
- summary: Grid Data
- description: Return grid data.
- operationId: grid_data
- security:
- - OAuth2PasswordBearer: []
- parameters:
- - name: dag_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Id
- - name: include_upstream
- in: query
- required: false
- schema:
- type: boolean
- default: false
- title: Include Upstream
- - name: include_downstream
- in: query
- required: false
- schema:
- type: boolean
- default: false
- title: Include Downstream
- - name: root
- in: query
- required: false
- schema:
- anyOf:
- - type: string
- - type: 'null'
- title: Root
- - name: offset
- in: query
- required: false
- schema:
- type: integer
- minimum: 0
- default: 0
- title: Offset
- - name: run_type
- in: query
- required: false
- schema:
- type: array
- items:
- type: string
- title: Run Type
- - name: state
- in: query
- required: false
- schema:
- type: array
- items:
- type: string
- title: State
- - name: limit
- in: query
- required: false
- schema:
- type: integer
- minimum: 0
- default: 50
- title: Limit
- - name: order_by
- in: query
- required: false
- schema:
- type: string
- default: id
- title: Order By
- - name: run_after_gte
- in: query
- required: false
- schema:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Run After Gte
- - name: run_after_lte
- in: query
- required: false
- schema:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Run After Lte
- - name: logical_date_gte
- in: query
- required: false
- schema:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Logical Date Gte
- - name: logical_date_lte
- in: query
- required: false
- schema:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Logical Date Lte
- responses:
- '200':
- description: Successful Response
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/GridResponse'
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '404':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Not Found
- '422':
- description: Validation Error
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
/ui/grid/structure/{dag_id}:
get:
tags:
@@ -1794,81 +1656,6 @@ components:
- text
- href
title: ExtraMenuItem
- GridDAGRunwithTIs:
- properties:
- dag_run_id:
- type: string
- title: Dag Run Id
- queued_at:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Queued At
- start_date:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Start Date
- end_date:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: End Date
- run_after:
- type: string
- format: date-time
- title: Run After
- state:
- $ref: '#/components/schemas/DagRunState'
- run_type:
- $ref: '#/components/schemas/DagRunType'
- logical_date:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Logical Date
- data_interval_start:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Data Interval Start
- data_interval_end:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Data Interval End
- note:
- anyOf:
- - type: string
- - type: 'null'
- title: Note
- task_instances:
- items:
- $ref: '#/components/schemas/GridTaskInstanceSummary'
- type: array
- title: Task Instances
- type: object
- required:
- - dag_run_id
- - queued_at
- - start_date
- - end_date
- - run_after
- - state
- - run_type
- - logical_date
- - data_interval_start
- - data_interval_end
- - note
- - task_instances
- title: GridDAGRunwithTIs
- description: DAG Run model for the Grid UI.
GridNodeResponse:
properties:
id:
@@ -1904,18 +1691,6 @@ components:
- is_mapped
title: GridNodeResponse
description: Base Node serializer for responses.
- GridResponse:
- properties:
- dag_runs:
- items:
- $ref: '#/components/schemas/GridDAGRunwithTIs'
- type: array
- title: Dag Runs
- type: object
- required:
- - dag_runs
- title: GridResponse
- description: Response model for the Grid UI.
GridRunsResponse:
properties:
dag_id:
@@ -1991,64 +1766,6 @@ components:
- task_instances
title: GridTISummaries
description: DAG Run model for the Grid UI.
- GridTaskInstanceSummary:
- properties:
- task_id:
- type: string
- title: Task Id
- try_number:
- type: integer
- title: Try Number
- start_date:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Start Date
- end_date:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: End Date
- queued_dttm:
- anyOf:
- - type: string
- format: date-time
- - type: 'null'
- title: Queued Dttm
- child_states:
- anyOf:
- - additionalProperties:
- type: integer
- type: object
- - type: 'null'
- title: Child States
- task_count:
- type: integer
- title: Task Count
- state:
- anyOf:
- - $ref: '#/components/schemas/TaskInstanceState'
- - type: 'null'
- note:
- anyOf:
- - type: string
- - type: 'null'
- title: Note
- type: object
- required:
- - task_id
- - try_number
- - start_date
- - end_date
- - queued_dttm
- - child_states
- - task_count
- - state
- - note
- title: GridTaskInstanceSummary
- description: Task Instance Summary model for the Grid UI.
HTTPExceptionResponse:
properties:
detail:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 56118a9e71e..5c7494d3b70 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -18,23 +18,15 @@
from __future__ import annotations
import collections
-import itertools
from typing import TYPE_CHECKING, Annotated
import structlog
from fastapi import Depends, HTTPException, status
from sqlalchemy import select
-from sqlalchemy.orm import joinedload, selectinload
-from airflow import DAG
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
-from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import (
- QueryDagRunRunTypesFilter,
- QueryDagRunStateFilter,
- QueryIncludeDownstream,
- QueryIncludeUpstream,
QueryLimit,
QueryOffset,
RangeFilter,
@@ -48,8 +40,6 @@ from airflow.api_fastapi.core_api.datamodels.ui.common import
(
LatestRunResponse,
)
from airflow.api_fastapi.core_api.datamodels.ui.grid import (
- GridDAGRunwithTIs,
- GridResponse,
GridTISummaries,
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
@@ -57,16 +47,11 @@ from airflow.api_fastapi.core_api.security import
requires_access_dag
from airflow.api_fastapi.core_api.services.ui.grid import (
_find_aggregates,
_merge_node_dicts,
- fill_task_instance_summaries,
- get_child_task_map,
- get_task_group_map,
)
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
-from airflow.models.taskinstancehistory import TaskInstanceHistory
-from airflow.utils.state import TaskInstanceState
from airflow.utils.task_group import (
get_task_group_children_getter,
task_group_to_dict_grid,
@@ -76,201 +61,6 @@ log = structlog.get_logger(logger_name=__name__)
grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
-@grid_router.get(
- "/{dag_id}",
- responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST,
status.HTTP_404_NOT_FOUND]),
- dependencies=[
- Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.TASK_INSTANCE)),
- Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.RUN)),
- ],
- response_model_exclude_none=True,
-)
-def grid_data(
- dag_id: str,
- session: SessionDep,
- offset: QueryOffset,
- dag_bag: DagBagDep,
- run_type: QueryDagRunRunTypesFilter,
- state: QueryDagRunStateFilter,
- limit: QueryLimit,
- order_by: Annotated[
- SortParam,
- Depends(SortParam(["run_after", "logical_date", "start_date",
"end_date"], DagRun).dynamic_depends()),
- ],
- run_after: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("run_after", DagRun))],
- logical_date: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("logical_date", DagRun))],
- include_upstream: QueryIncludeUpstream = False,
- include_downstream: QueryIncludeDownstream = False,
- root: str | None = None,
-) -> GridResponse:
- """Return grid data."""
- dag: DAG = dag_bag.get_dag(dag_id)
- if not dag:
- raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
-
- # Retrieve, sort the previous DAG Runs
- base_query = (
- select(DagRun)
- .join(DagRun.dag_run_note, isouter=True)
-
.options(joinedload(DagRun.task_instances).joinedload(TaskInstance.dag_version))
-
.options(joinedload(DagRun.task_instances_histories).joinedload(TaskInstanceHistory.dag_version))
- .where(DagRun.dag_id == dag.dag_id)
- )
-
- # This comparison is to falls to DAG timetable when no order_by is provided
- if order_by.value == order_by.get_primary_key_string():
- order_by = SortParam(
- allowed_attrs=[run_ordering for run_ordering in
dag.timetable.run_ordering], model=DagRun
- ).set_value(dag.timetable.run_ordering[0])
-
- dag_runs_select_filter, _ = paginated_select(
- statement=base_query,
- filters=[
- run_type,
- state,
- run_after,
- logical_date,
- ],
- order_by=order_by,
- offset=offset,
- limit=limit,
- )
-
- dag_runs = list(session.scalars(dag_runs_select_filter).unique())
- # Check if there are any DAG Runs with given criteria to eliminate
unnecessary queries/errors
- if not dag_runs:
- return GridResponse(dag_runs=[])
-
- # Retrieve, sort and encode the Task Instances
- tis_of_dag_runs, _ = paginated_select(
- statement=select(TaskInstance)
- .options(selectinload(TaskInstance.task_instance_note))
- .where(TaskInstance.dag_id == dag.dag_id)
- .where(TaskInstance.run_id.in_([dag_run.run_id for dag_run in
dag_runs])),
- filters=[],
- order_by=SortParam(allowed_attrs=["task_id", "run_id"],
model=TaskInstance).set_value("task_id"),
- offset=offset,
- limit=None,
- )
-
- task_instances = session.scalars(tis_of_dag_runs)
-
- tis_by_run_id: dict[str, list[TaskInstance]] =
collections.defaultdict(list)
- for ti in task_instances:
- tis_by_run_id[ti.run_id].append(ti)
-
- # Generate Grouped Task Instances
- task_node_map_exclude = None
- if root:
- task_node_map_exclude = get_task_group_map(
- dag=dag.partial_subset(
- task_ids=root,
- include_upstream=include_upstream,
- include_downstream=include_downstream,
- )
- )
-
- # Group the Task Instances by Parent Task (TaskGroup or Mapped) and All
Task Instances
- parent_tis: dict[tuple[str, str], list] = collections.defaultdict(list)
- all_tis: dict[tuple[str, str], list] = collections.defaultdict(list)
-
- for tis in tis_by_run_id.values():
- # this is a simplification - we account for structure based on the
first task
- version = tis[0].dag_version
- if not version:
- version = session.scalar(
- select(DagVersion)
- .where(
- DagVersion.dag_id == tis[0].dag_id,
- )
- .order_by(DagVersion.id) # ascending cus this is mostly for
pre-3.0 upgrade
- .limit(1)
- )
- if not version.serialized_dag:
- log.error(
- "No serialized dag found",
- dag_id=tis[0].dag_id,
- version_id=version.id,
- version_number=version.version_number,
- )
- continue
- run_dag = version.serialized_dag.dag
- task_node_map = get_task_group_map(dag=run_dag)
- for ti in tis:
- # Skip the Task Instances if upstream/downstream filtering is
applied or if the task was removed.
- if (
- task_node_map_exclude and ti.task_id not in
task_node_map_exclude
- ) or ti.state == TaskInstanceState.REMOVED:
- continue
-
- # Populate the Grouped Task Instances (All Task Instances except
the Parent Task Instances)
- if ti.task_id in get_child_task_map(
- parent_task_id=task_node_map[ti.task_id]["parent_id"],
task_node_map=task_node_map
- ):
- all_tis[(ti.task_id, ti.run_id)].append(ti)
- # Populate the Parent Task Instances
- parent_id = task_node_map[ti.task_id]["parent_id"]
- if not parent_id and task_node_map[ti.task_id]["is_group"]:
- parent_tis[(ti.task_id, ti.run_id)].append(ti)
- elif parent_id and task_node_map[parent_id]["is_group"]:
- parent_tis[(parent_id, ti.run_id)].append(ti)
-
- # Clear task_node_map_exclude to free up memory
- if task_node_map_exclude:
- task_node_map_exclude.clear()
-
- task_node_map = get_task_group_map(dag=dag)
- # Extend subgroup task instances to parent task instances to calculate the
aggregates states
- task_group_map = {k: v for k, v in task_node_map.items() if v["is_group"]}
- parent_tis.update(
- {
- (task_id_parent, run_id): parent_tis[(task_id_parent, run_id)] +
parent_tis[(task_id, run_id)]
- for task_id, task_map in task_group_map.items()
- if task_map["is_group"]
- for (task_id_parent, run_id), tis in list(parent_tis.items())
- if task_id_parent == task_map["parent_id"]
- }
- )
- # Create the Task Instance Summaries to be used in the Grid Response
- task_instance_summaries: dict[str, list] = {
- run_id: [] for _, run_id in itertools.chain(parent_tis, all_tis)
- }
-
- # Fill the Task Instance Summaries for the Parent and Grouped Task
Instances.
- # First the Parent Task Instances because they are used in the Grouped
Task Instances
- fill_task_instance_summaries(
- grouped_task_instances=parent_tis,
- task_instance_summaries_to_fill=task_instance_summaries,
- session=session,
- )
- # Fill the Task Instance Summaries for the Grouped Task Instances
- fill_task_instance_summaries(
- grouped_task_instances=all_tis,
- task_instance_summaries_to_fill=task_instance_summaries,
- session=session,
- )
-
- # Aggregate the Task Instances by DAG Run
- grid_dag_runs = [
- GridDAGRunwithTIs(
- run_id=dag_run.run_id,
- queued_at=dag_run.queued_at,
- start_date=dag_run.start_date,
- end_date=dag_run.end_date,
- run_after=dag_run.run_after,
- logical_date=dag_run.logical_date,
- state=dag_run.state,
- run_type=dag_run.run_type,
- data_interval_start=dag_run.data_interval_start,
- data_interval_end=dag_run.data_interval_end,
- note=dag_run.note,
- task_instances=task_instance_summaries.get(dag_run.run_id, []),
- )
- for dag_run in dag_runs
- ]
- return GridResponse(dag_runs=grid_dag_runs)
-
-
def _get_latest_serdag(dag_id, session):
serdag = session.scalar(
select(SerializedDagModel)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
index de92e6e6fa4..6b8f7bbb372 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
@@ -17,243 +17,25 @@
from __future__ import annotations
-import contextlib
from collections import Counter
from collections.abc import Iterable
-from uuid import UUID
import structlog
from sqlalchemy import select
-from typing_extensions import Any
-from airflow import DAG
-from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.parameters import (
state_priority,
)
-from airflow.api_fastapi.core_api.datamodels.ui.grid import (
- GridTaskInstanceSummary,
-)
-from airflow.api_fastapi.core_api.datamodels.ui.structure import (
- StructureDataResponse,
-)
-from airflow.models.baseoperator import BaseOperator as DBBaseOperator
from airflow.models.dag_version import DagVersion
from airflow.models.taskmap import TaskMap
from airflow.sdk import BaseOperator
-from airflow.sdk.definitions._internal.abstractoperator import NotMapped
-from airflow.sdk.definitions._internal.expandinput import NotFullyPopulated
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
-from airflow.serialization.serialized_objects import SerializedDAG
-from airflow.utils.state import TaskInstanceState
-from airflow.utils.task_group import get_task_group_children_getter,
task_group_to_dict
+from airflow.utils.task_group import get_task_group_children_getter
log = structlog.get_logger(logger_name=__name__)
-def get_task_group_map(dag: DAG) -> dict[str, dict[str, Any]]:
- """
- Get the Task Group Map for the DAG.
-
- :param dag: DAG
-
- :return: Task Group Map
- """
- task_nodes: dict[str, dict[str, Any]] = {}
-
- def _is_task_node_mapped_task_group(task_node: BaseOperator |
MappedTaskGroup | TaskMap | None) -> bool:
- """Check if the Task Node is a Mapped Task Group."""
- return type(task_node) is MappedTaskGroup
-
- def _append_child_task_count_to_parent(
- child_task_count: int | MappedTaskGroup | TaskMap | MappedOperator |
None,
- parent_node: BaseOperator | MappedTaskGroup | TaskMap | None,
- ):
- """
- Append the Child Task Count to the Parent.
-
- This method should only be used for Mapped Models.
- """
- if isinstance(parent_node, TaskGroup):
- # Remove the regular task counted in parent_node
- task_nodes[parent_node.node_id]["task_count"].append(-1)
- # Add the mapped task to the parent_node
-
task_nodes[parent_node.node_id]["task_count"].append(child_task_count)
-
- def _fill_task_group_map(
- task_node: BaseOperator | MappedTaskGroup | TaskMap | None,
- parent_node: BaseOperator | MappedTaskGroup | TaskMap | None,
- ) -> None:
- """Recursively fill the Task Group Map."""
- if task_node is None:
- return
-
- if isinstance(task_node, MappedOperator):
- task_nodes[task_node.node_id] = {
- "is_group": False,
- "parent_id": parent_node.node_id if parent_node else None,
- "task_count": [task_node],
- }
- # Add the Task Count to the Parent Node because parent node is a
Task Group
- _append_child_task_count_to_parent(child_task_count=task_node,
parent_node=parent_node)
- return
-
- if isinstance(task_node, TaskGroup):
- task_count = task_node if
_is_task_node_mapped_task_group(task_node) else len(task_node.children)
- task_nodes[task_node.node_id] = {
- "is_group": True,
- "parent_id": parent_node.node_id if parent_node else None,
- "task_count": [task_count],
- }
- for child in get_task_group_children_getter()(task_node):
- _fill_task_group_map(task_node=child, parent_node=task_node)
- return
-
- if isinstance(task_node, BaseOperator):
- task_nodes[task_node.task_id] = {
- "is_group": False,
- "parent_id": parent_node.node_id if parent_node else None,
- "task_count": task_nodes[parent_node.node_id]["task_count"]
- if _is_task_node_mapped_task_group(parent_node) and parent_node
- else [1],
- }
- # No Need to Add the Task Count to the Parent Node, these are
already counted in Add the Parent
- return
-
- for node in [child for child in
get_task_group_children_getter()(dag.task_group)]:
- _fill_task_group_map(task_node=node, parent_node=None)
-
- return task_nodes
-
-
-def get_child_task_map(parent_task_id: str, task_node_map: dict[str, dict[str,
Any]]):
- """Get the Child Task Map for the Parent Task ID."""
- return [task_id for task_id, task_map in task_node_map.items() if
task_map["parent_id"] == parent_task_id]
-
-
-def _count_tis(node: int | MappedTaskGroup | MappedOperator, run_id: str,
session: SessionDep) -> int:
- if not isinstance(node, (MappedTaskGroup, MappedOperator)):
- return node
- with contextlib.suppress(NotFullyPopulated, NotMapped):
- return DBBaseOperator.get_mapped_ti_count(node, run_id=run_id,
session=session)
- # If the downstream is not actually mapped, or we don't have information to
- # determine the length yet, simply return 1 to represent the stand-in ti.
- return 1
-
-
-def fill_task_instance_summaries(
- grouped_task_instances: dict[tuple[str, str], list],
- task_instance_summaries_to_fill: dict[str, list],
- session: SessionDep,
-) -> None:
- """
- Fill the Task Instance Summaries for the Grouped Task Instances.
-
- :param grouped_task_instances: Grouped Task Instances
- :param task_instance_summaries_to_fill: Task Instance Summaries to fill
- :param session: Session
-
- :return: None
- """
- # Additional logic to calculate the overall states to cascade recursive
task states
- overall_states: dict[tuple[str, str], str] = {
- (task_id, run_id): next(
- (
- str(state.value)
- for state in state_priority
- for ti in tis
- if state is not None and ti.state == state
- ),
- "no_status",
- )
- for (task_id, run_id), tis in grouped_task_instances.items()
- }
-
- serdag_cache: dict[UUID, SerializedDAG] = {}
- task_group_map_cache: dict[UUID, dict[str, dict[str, Any]]] = {}
-
- for (task_id, run_id), tis in grouped_task_instances.items():
- if not tis:
- continue
-
- sdm = _get_serdag(tis[0], session)
- serdag_cache[sdm.id] = serdag_cache.get(sdm.id) or sdm.dag
- dag = serdag_cache[sdm.id]
- task_group_map_cache[sdm.id] = task_group_map_cache.get(sdm.id) or
get_task_group_map(dag=dag)
- task_node_map = task_group_map_cache[sdm.id]
- ti_try_number = max([ti.try_number for ti in tis])
- ti_start_date = min([ti.start_date for ti in tis if ti.start_date],
default=None)
- ti_end_date = max([ti.end_date for ti in tis if ti.end_date],
default=None)
- ti_queued_dttm = min([ti.queued_dttm for ti in tis if ti.queued_dttm],
default=None)
- ti_note = min([ti.note for ti in tis if ti.note], default=None)
-
- # Calculate the child states for the task
- # Initialize the child states with 0
- child_states = {"no_status" if state is None else state.name.lower():
0 for state in state_priority}
- # Update Task States for non-grouped tasks
- child_states.update(
- {
- "no_status" if state is None else state.name.lower(): len(
- [ti for ti in tis if ti.state == state]
- if not task_node_map[task_id]["is_group"]
- else [
- ti
- for ti in tis
- if ti.state == state and ti.task_id in
get_child_task_map(task_id, task_node_map)
- ]
- )
- for state in state_priority
- }
- )
-
- # Update Nested Task Group States by aggregating the child states
- child_states.update(
- {
- overall_states[(task_node_id, run_id)].lower():
child_states.get(
- overall_states[(task_node_id, run_id)].lower(), 0
- )
- + 1
- for task_node_id in get_child_task_map(task_id, task_node_map)
- if task_node_map[task_node_id]["is_group"] and (task_node_id,
run_id) in overall_states
- }
- )
-
- # Get the overall state for the task
- overall_ti_state = next(
- (
- state
- for state in state_priority
- for state_name, state_count in child_states.items()
- if state_count > 0 and state_name == state
- ),
- "no_status",
- )
-
- # Task Count is either integer or a TaskGroup to get the task count
- task_instance_summaries_to_fill[run_id].append(
- GridTaskInstanceSummary(
- task_id=task_id,
- try_number=ti_try_number,
- start_date=ti_start_date,
- end_date=ti_end_date,
- queued_dttm=ti_queued_dttm,
- child_states=child_states,
- task_count=sum(_count_tis(n, run_id, session) for n in
task_node_map[task_id]["task_count"]),
- state=TaskInstanceState[overall_ti_state.upper()]
- if overall_ti_state != "no_status"
- else None,
- note=ti_note,
- )
- )
-
-
-def get_structure_from_dag(dag: DAG) -> StructureDataResponse:
- """If we do not have TIs, we just get the structure from the DAG."""
- nodes = [task_group_to_dict(child) for child in
get_task_group_children_getter()(dag.task_group)]
- return StructureDataResponse(nodes=nodes, edges=[])
-
-
def _get_serdag(ti, session):
dag_version = ti.dag_version
if not dag_version:
@@ -277,29 +59,13 @@ def _get_serdag(ti, session):
return dag_version.serialized_dag
-def get_combined_structure(task_instances, session):
- """Given task instances with varying DAG versions, get a combined
structure."""
- merged_nodes = []
- # we dedup with serdag, as serdag.dag varies somehow?
- serdags = {_get_serdag(ti, session) for ti in task_instances}
- dags = []
- for serdag in serdags:
- if serdag:
- dags.append(serdag.dag)
- for dag in dags:
- nodes = [task_group_to_dict(child) for child in
get_task_group_children_getter()(dag.task_group)]
- _merge_node_dicts(merged_nodes, nodes)
-
- return StructureDataResponse(nodes=merged_nodes, edges=[])
-
-
def _merge_node_dicts(current, new) -> None:
current_ids = {node["id"] for node in current}
for node in new:
if node["id"] in current_ids:
current_node = _get_node_by_id(current, node["id"])
# if we have children, merge those as well
- if "children" in current_node:
+ if current_node.get("children"):
_merge_node_dicts(current_node["children"], node["children"])
else:
current.append(node)
@@ -312,11 +78,6 @@ def _get_node_by_id(nodes, node_id):
return {}
-def _is_task_node_mapped_task_group(task_node: BaseOperator | MappedTaskGroup
| TaskMap | None) -> bool:
- """Check if the Task Node is a Mapped Task Group."""
- return type(task_node) is MappedTaskGroup
-
-
def agg_state(states):
states = Counter(states)
for state in state_priority:
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 8410c0ba474..143ec4c7655 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -672,24 +672,6 @@ export const UseStructureServiceStructureDataKeyFn = ({
dagId, externalDependenc
root?: string;
versionNumber?: number;
}, queryKey?: Array<unknown>) => [useStructureServiceStructureDataKey,
...(queryKey ?? [{ dagId, externalDependencies, includeDownstream,
includeUpstream, root, versionNumber }])];
-export type GridServiceGridDataDefaultResponse = Awaited<ReturnType<typeof
GridService.gridData>>;
-export type GridServiceGridDataQueryResult<TData =
GridServiceGridDataDefaultResponse, TError = unknown> = UseQueryResult<TData,
TError>;
-export const useGridServiceGridDataKey = "GridServiceGridData";
-export const UseGridServiceGridDataKeyFn = ({ dagId, includeDownstream,
includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root,
runAfterGte, runAfterLte, runType, state }: {
- dagId: string;
- includeDownstream?: boolean;
- includeUpstream?: boolean;
- limit?: number;
- logicalDateGte?: string;
- logicalDateLte?: string;
- offset?: number;
- orderBy?: string;
- root?: string;
- runAfterGte?: string;
- runAfterLte?: string;
- runType?: string[];
- state?: string[];
-}, queryKey?: Array<unknown>) => [useGridServiceGridDataKey, ...(queryKey ??
[{ dagId, includeDownstream, includeUpstream, limit, logicalDateGte,
logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state
}])];
export type GridServiceGetDagStructureDefaultResponse =
Awaited<ReturnType<typeof GridService.getDagStructure>>;
export type GridServiceGetDagStructureQueryResult<TData =
GridServiceGetDagStructureDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useGridServiceGetDagStructureKey = "GridServiceGetDagStructure";
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 7ccd7ca1a9d..1c0fc86697c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -1265,41 +1265,6 @@ export const ensureUseStructureServiceStructureDataData
= (queryClient: QueryCli
versionNumber?: number;
}) => queryClient.ensureQueryData({ queryKey:
Common.UseStructureServiceStructureDataKeyFn({ dagId, externalDependencies,
includeDownstream, includeUpstream, root, versionNumber }), queryFn: () =>
StructureService.structureData({ dagId, externalDependencies,
includeDownstream, includeUpstream, root, versionNumber }) });
/**
-* Grid Data
-* Return grid data.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.includeUpstream
-* @param data.includeDownstream
-* @param data.root
-* @param data.offset
-* @param data.runType
-* @param data.state
-* @param data.limit
-* @param data.orderBy
-* @param data.runAfterGte
-* @param data.runAfterLte
-* @param data.logicalDateGte
-* @param data.logicalDateLte
-* @returns GridResponse Successful Response
-* @throws ApiError
-*/
-export const ensureUseGridServiceGridDataData = (queryClient: QueryClient, {
dagId, includeDownstream, includeUpstream, limit, logicalDateGte,
logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state
}: {
- dagId: string;
- includeDownstream?: boolean;
- includeUpstream?: boolean;
- limit?: number;
- logicalDateGte?: string;
- logicalDateLte?: string;
- offset?: number;
- orderBy?: string;
- root?: string;
- runAfterGte?: string;
- runAfterLte?: string;
- runType?: string[];
- state?: string[];
-}) => queryClient.ensureQueryData({ queryKey:
Common.UseGridServiceGridDataKeyFn({ dagId, includeDownstream, includeUpstream,
limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte,
runAfterLte, runType, state }), queryFn: () => GridService.gridData({ dagId,
includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte,
offset, orderBy, root, runAfterGte, runAfterLte, runType, state }) });
-/**
* Get Dag Structure
* Return dag structure for grid view.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index 60fc346fdc6..d220cf4d195 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1265,41 +1265,6 @@ export const prefetchUseStructureServiceStructureData =
(queryClient: QueryClien
versionNumber?: number;
}) => queryClient.prefetchQuery({ queryKey:
Common.UseStructureServiceStructureDataKeyFn({ dagId, externalDependencies,
includeDownstream, includeUpstream, root, versionNumber }), queryFn: () =>
StructureService.structureData({ dagId, externalDependencies,
includeDownstream, includeUpstream, root, versionNumber }) });
/**
-* Grid Data
-* Return grid data.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.includeUpstream
-* @param data.includeDownstream
-* @param data.root
-* @param data.offset
-* @param data.runType
-* @param data.state
-* @param data.limit
-* @param data.orderBy
-* @param data.runAfterGte
-* @param data.runAfterLte
-* @param data.logicalDateGte
-* @param data.logicalDateLte
-* @returns GridResponse Successful Response
-* @throws ApiError
-*/
-export const prefetchUseGridServiceGridData = (queryClient: QueryClient, {
dagId, includeDownstream, includeUpstream, limit, logicalDateGte,
logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state
}: {
- dagId: string;
- includeDownstream?: boolean;
- includeUpstream?: boolean;
- limit?: number;
- logicalDateGte?: string;
- logicalDateLte?: string;
- offset?: number;
- orderBy?: string;
- root?: string;
- runAfterGte?: string;
- runAfterLte?: string;
- runType?: string[];
- state?: string[];
-}) => queryClient.prefetchQuery({ queryKey:
Common.UseGridServiceGridDataKeyFn({ dagId, includeDownstream, includeUpstream,
limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte,
runAfterLte, runType, state }), queryFn: () => GridService.gridData({ dagId,
includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte,
offset, orderBy, root, runAfterGte, runAfterLte, runType, state }) });
-/**
* Get Dag Structure
* Return dag structure for grid view.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index d2a6abe0a6b..8a7ffc05251 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -1265,41 +1265,6 @@ export const useStructureServiceStructureData = <TData =
Common.StructureService
versionNumber?: number;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseStructureServiceStructureDataKeyFn({ dagId, externalDependencies,
includeDownstream, includeUpstream, root, versionNumber }, queryKey), queryFn:
() => StructureService.structureData({ dagId, externalDependencies,
includeDownstream, includeUpstream, root, versionNumber }) as TData, ...options
});
/**
-* Grid Data
-* Return grid data.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.includeUpstream
-* @param data.includeDownstream
-* @param data.root
-* @param data.offset
-* @param data.runType
-* @param data.state
-* @param data.limit
-* @param data.orderBy
-* @param data.runAfterGte
-* @param data.runAfterLte
-* @param data.logicalDateGte
-* @param data.logicalDateLte
-* @returns GridResponse Successful Response
-* @throws ApiError
-*/
-export const useGridServiceGridData = <TData =
Common.GridServiceGridDataDefaultResponse, TError = unknown, TQueryKey extends
Array<unknown> = unknown[]>({ dagId, includeDownstream, includeUpstream, limit,
logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte,
runAfterLte, runType, state }: {
- dagId: string;
- includeDownstream?: boolean;
- includeUpstream?: boolean;
- limit?: number;
- logicalDateGte?: string;
- logicalDateLte?: string;
- offset?: number;
- orderBy?: string;
- root?: string;
- runAfterGte?: string;
- runAfterLte?: string;
- runType?: string[];
- state?: string[];
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseGridServiceGridDataKeyFn({ dagId, includeDownstream, includeUpstream,
limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte,
runAfterLte, runType, state }, queryKey), queryFn: () => GridService.gridData({
dagId, includeDownstream, includeUpstream, limit, logicalDateGte,
logicalDateLte, offset, orderBy, root, runAfterGte, run [...]
-/**
* Get Dag Structure
* Return dag structure for grid view.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index a8b84a5a173..57f12caea75 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1265,41 +1265,6 @@ export const useStructureServiceStructureDataSuspense =
<TData = Common.Structur
versionNumber?: number;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseStructureServiceStructureDataKeyFn({ dagId, externalDependencies,
includeDownstream, includeUpstream, root, versionNumber }, queryKey), queryFn:
() => StructureService.structureData({ dagId, externalDependencies,
includeDownstream, includeUpstream, root, versionNumber }) as TData, ...options
});
/**
-* Grid Data
-* Return grid data.
-* @param data The data for the request.
-* @param data.dagId
-* @param data.includeUpstream
-* @param data.includeDownstream
-* @param data.root
-* @param data.offset
-* @param data.runType
-* @param data.state
-* @param data.limit
-* @param data.orderBy
-* @param data.runAfterGte
-* @param data.runAfterLte
-* @param data.logicalDateGte
-* @param data.logicalDateLte
-* @returns GridResponse Successful Response
-* @throws ApiError
-*/
-export const useGridServiceGridDataSuspense = <TData =
Common.GridServiceGridDataDefaultResponse, TError = unknown, TQueryKey extends
Array<unknown> = unknown[]>({ dagId, includeDownstream, includeUpstream, limit,
logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte,
runAfterLte, runType, state }: {
- dagId: string;
- includeDownstream?: boolean;
- includeUpstream?: boolean;
- limit?: number;
- logicalDateGte?: string;
- logicalDateLte?: string;
- offset?: number;
- orderBy?: string;
- root?: string;
- runAfterGte?: string;
- runAfterLte?: string;
- runType?: string[];
- state?: string[];
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseGridServiceGridDataKeyFn({ dagId, includeDownstream, includeUpstream,
limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte,
runAfterLte, runType, state }, queryKey), queryFn: () => GridService.gridData({
dagId, includeDownstream, includeUpstream, limit, logicalDateGte,
logicalDateLte, offset, orderBy, root, runAfter [...]
-/**
* Get Dag Structure
* Return dag structure for grid view.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 522cb366a74..8089971c150 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -6740,120 +6740,6 @@ export const $ExtraMenuItem = {
title: 'ExtraMenuItem'
} as const;
-export const $GridDAGRunwithTIs = {
- properties: {
- dag_run_id: {
- type: 'string',
- title: 'Dag Run Id'
- },
- queued_at: {
- anyOf: [
- {
- type: 'string',
- format: 'date-time'
- },
- {
- type: 'null'
- }
- ],
- title: 'Queued At'
- },
- start_date: {
- anyOf: [
- {
- type: 'string',
- format: 'date-time'
- },
- {
- type: 'null'
- }
- ],
- title: 'Start Date'
- },
- end_date: {
- anyOf: [
- {
- type: 'string',
- format: 'date-time'
- },
- {
- type: 'null'
- }
- ],
- title: 'End Date'
- },
- run_after: {
- type: 'string',
- format: 'date-time',
- title: 'Run After'
- },
- state: {
- '$ref': '#/components/schemas/DagRunState'
- },
- run_type: {
- '$ref': '#/components/schemas/DagRunType'
- },
- logical_date: {
- anyOf: [
- {
- type: 'string',
- format: 'date-time'
- },
- {
- type: 'null'
- }
- ],
- title: 'Logical Date'
- },
- data_interval_start: {
- anyOf: [
- {
- type: 'string',
- format: 'date-time'
- },
- {
- type: 'null'
- }
- ],
- title: 'Data Interval Start'
- },
- data_interval_end: {
- anyOf: [
- {
- type: 'string',
- format: 'date-time'
- },
- {
- type: 'null'
- }
- ],
- title: 'Data Interval End'
- },
- note: {
- anyOf: [
- {
- type: 'string'
- },
- {
- type: 'null'
- }
- ],
- title: 'Note'
- },
- task_instances: {
- items: {
- '$ref': '#/components/schemas/GridTaskInstanceSummary'
- },
- type: 'array',
- title: 'Task Instances'
- }
- },
- type: 'object',
- required: ['dag_run_id', 'queued_at', 'start_date', 'end_date',
'run_after', 'state', 'run_type', 'logical_date', 'data_interval_start',
'data_interval_end', 'note', 'task_instances'],
- title: 'GridDAGRunwithTIs',
- description: 'DAG Run model for the Grid UI.'
-} as const;
-
export const $GridNodeResponse = {
properties: {
id: {
@@ -6908,22 +6794,6 @@ export const $GridNodeResponse = {
description: 'Base Node serializer for responses.'
} as const;
-export const $GridResponse = {
- properties: {
- dag_runs: {
- items: {
- '$ref': '#/components/schemas/GridDAGRunwithTIs'
- },
- type: 'array',
- title: 'Dag Runs'
- }
- },
- type: 'object',
- required: ['dag_runs'],
- title: 'GridResponse',
- description: 'Response model for the Grid UI.'
-} as const;
-
export const $GridRunsResponse = {
properties: {
dag_id: {
@@ -7031,98 +6901,6 @@ export const $GridTISummaries = {
description: 'DAG Run model for the Grid UI.'
} as const;
-export const $GridTaskInstanceSummary = {
- properties: {
- task_id: {
- type: 'string',
- title: 'Task Id'
- },
- try_number: {
- type: 'integer',
- title: 'Try Number'
- },
- start_date: {
- anyOf: [
- {
- type: 'string',
- format: 'date-time'
- },
- {
- type: 'null'
- }
- ],
- title: 'Start Date'
- },
- end_date: {
- anyOf: [
- {
- type: 'string',
- format: 'date-time'
- },
- {
- type: 'null'
- }
- ],
- title: 'End Date'
- },
- queued_dttm: {
- anyOf: [
- {
- type: 'string',
- format: 'date-time'
- },
- {
- type: 'null'
- }
- ],
- title: 'Queued Dttm'
- },
- child_states: {
- anyOf: [
- {
- additionalProperties: {
- type: 'integer'
- },
- type: 'object'
- },
- {
- type: 'null'
- }
- ],
- title: 'Child States'
- },
- task_count: {
- type: 'integer',
- title: 'Task Count'
- },
- state: {
- anyOf: [
- {
- '$ref': '#/components/schemas/TaskInstanceState'
- },
- {
- type: 'null'
- }
- ]
- },
- note: {
- anyOf: [
- {
- type: 'string'
- },
- {
- type: 'null'
- }
- ],
- title: 'Note'
- }
- },
- type: 'object',
- required: ['task_id', 'try_number', 'start_date', 'end_date',
'queued_dttm', 'child_states', 'task_count', 'state', 'note'],
- title: 'GridTaskInstanceSummary',
- description: 'Task Instance Summary model for the Grid UI.'
-} as const;
-
export const $HistoricalMetricDataResponse = {
properties: {
dag_run_types: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index ba77ae06680..321a708f6b3 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
import type { CancelablePromise } from './core/CancelablePromise';
import { OpenAPI } from './core/OpenAPI';
import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
export class AssetService {
/**
@@ -3554,55 +3554,6 @@ export class StructureService {
}
export class GridService {
- /**
- * Grid Data
- * Return grid data.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.includeUpstream
- * @param data.includeDownstream
- * @param data.root
- * @param data.offset
- * @param data.runType
- * @param data.state
- * @param data.limit
- * @param data.orderBy
- * @param data.runAfterGte
- * @param data.runAfterLte
- * @param data.logicalDateGte
- * @param data.logicalDateLte
- * @returns GridResponse Successful Response
- * @throws ApiError
- */
- public static gridData(data: GridDataData):
CancelablePromise<GridDataResponse> {
- return __request(OpenAPI, {
- method: 'GET',
- url: '/ui/grid/{dag_id}',
- path: {
- dag_id: data.dagId
- },
- query: {
- include_upstream: data.includeUpstream,
- include_downstream: data.includeDownstream,
- root: data.root,
- offset: data.offset,
- run_type: data.runType,
- state: data.state,
- limit: data.limit,
- order_by: data.orderBy,
- run_after_gte: data.runAfterGte,
- run_after_lte: data.runAfterLte,
- logical_date_gte: data.logicalDateGte,
- logical_date_lte: data.logicalDateLte
- },
- errors: {
- 400: 'Bad Request',
- 404: 'Not Found',
- 422: 'Validation Error'
- }
- });
- }
-
/**
* Get Dag Structure
* Return dag structure for grid view.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 80b644ce63f..733e2985098 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1730,24 +1730,6 @@ export type ExtraMenuItem = {
href: string;
};
-/**
- * DAG Run model for the Grid UI.
- */
-export type GridDAGRunwithTIs = {
- dag_run_id: string;
- queued_at: string | null;
- start_date: string | null;
- end_date: string | null;
- run_after: string;
- state: DagRunState;
- run_type: DagRunType;
- logical_date: string | null;
- data_interval_start: string | null;
- data_interval_end: string | null;
- note: string | null;
- task_instances: Array<GridTaskInstanceSummary>;
-};
-
/**
* Base Node serializer for responses.
*/
@@ -1759,13 +1741,6 @@ export type GridNodeResponse = {
setup_teardown_type?: 'setup' | 'teardown' | null;
};
-/**
- * Response model for the Grid UI.
- */
-export type GridResponse = {
- dag_runs: Array<GridDAGRunwithTIs>;
-};
-
/**
* Base Node serializer for responses.
*/
@@ -1790,23 +1765,6 @@ export type GridTISummaries = {
task_instances: Array<LightGridTaskInstanceSummary>;
};
-/**
- * Task Instance Summary model for the Grid UI.
- */
-export type GridTaskInstanceSummary = {
- task_id: string;
- try_number: number;
- start_date: string | null;
- end_date: string | null;
- queued_dttm: string | null;
- child_states: {
- [key: string]: (number);
-} | null;
- task_count: number;
- state: TaskInstanceState | null;
- note: string | null;
-};
-
/**
* Historical Metric Data serializer for responses.
*/
@@ -2933,24 +2891,6 @@ export type StructureDataData = {
export type StructureDataResponse2 = StructureDataResponse;
-export type GridDataData = {
- dagId: string;
- includeDownstream?: boolean;
- includeUpstream?: boolean;
- limit?: number;
- logicalDateGte?: string | null;
- logicalDateLte?: string | null;
- offset?: number;
- orderBy?: string;
- root?: string | null;
- runAfterGte?: string | null;
- runAfterLte?: string | null;
- runType?: Array<(string)>;
- state?: Array<(string)>;
-};
-
-export type GridDataResponse = GridResponse;
-
export type GetDagStructureData = {
dagId: string;
limit?: number;
@@ -5988,29 +5928,6 @@ export type $OpenApiTs = {
};
};
};
- '/ui/grid/{dag_id}': {
- get: {
- req: GridDataData;
- res: {
- /**
- * Successful Response
- */
- 200: GridResponse;
- /**
- * Bad Request
- */
- 400: HTTPExceptionResponse;
- /**
- * Not Found
- */
- 404: HTTPExceptionResponse;
- /**
- * Validation Error
- */
- 422: HTTPValidationError;
- };
- };
- };
'/ui/grid/structure/{dag_id}': {
get: {
req: GetDagStructureData;
diff --git a/airflow-core/src/airflow/ui/src/components/Graph/TaskNode.tsx
b/airflow-core/src/airflow/ui/src/components/Graph/TaskNode.tsx
index 9aa68ceafcc..a3c7f688e79 100644
--- a/airflow-core/src/airflow/ui/src/components/Graph/TaskNode.tsx
+++ b/airflow-core/src/airflow/ui/src/components/Graph/TaskNode.tsx
@@ -18,8 +18,8 @@
*/
import { Box, Button, Flex, HStack, LinkOverlay, Text } from
"@chakra-ui/react";
import type { NodeProps, Node as NodeType } from "@xyflow/react";
+import { useMemo } from "react";
import { useTranslation } from "react-i18next";
-import { CgRedo } from "react-icons/cg";
import { StateBadge } from "src/components/StateBadge";
import TaskInstanceTooltip from "src/components/TaskInstanceTooltip";
@@ -53,6 +53,13 @@ export const TaskNode = ({
toggleGroupId(id);
}
};
+ const thisChildCount = useMemo(
+ () =>
+ Object.entries(taskInstance?.child_states ?? {})
+ .map(([_state, count]) => count)
+ .reduce((sum, val) => sum + val, 0),
+ [taskInstance],
+ );
return (
<NodeWrapper>
@@ -82,7 +89,7 @@ export const TaskNode = ({
>
<LinkOverlay asChild>
<TaskLink
- childCount={taskInstance?.task_count}
+ childCount={thisChildCount}
id={id}
isGroup={isGroup}
isMapped={isMapped}
@@ -106,7 +113,6 @@ export const TaskNode = ({
<StateBadge fontSize="xs" state={taskInstance.state}>
{taskInstance.state}
</StateBadge>
- {taskInstance.try_number > 1 ? <CgRedo /> : undefined}
</HStack>
)}
{isGroup ? (
diff --git a/airflow-core/src/airflow/ui/src/components/Graph/reactflowUtils.ts
b/airflow-core/src/airflow/ui/src/components/Graph/reactflowUtils.ts
index cf704be094a..dcfd4dcc41f 100644
--- a/airflow-core/src/airflow/ui/src/components/Graph/reactflowUtils.ts
+++ b/airflow-core/src/airflow/ui/src/components/Graph/reactflowUtils.ts
@@ -19,7 +19,7 @@
import type { Node as FlowNodeType, Edge as FlowEdgeType } from
"@xyflow/react";
import type { ElkExtendedEdge } from "elkjs";
-import type { GridTaskInstanceSummary, NodeResponse } from
"openapi/requests/types.gen";
+import type { LightGridTaskInstanceSummary, NodeResponse } from
"openapi/requests/types.gen";
import type { LayoutNode } from "./useGraphLayout";
@@ -36,7 +36,7 @@ export type CustomNodeProps = {
label: string;
operator?: string | null;
setupTeardownType?: NodeResponse["setup_teardown_type"];
- taskInstance?: GridTaskInstanceSummary;
+ taskInstance?: LightGridTaskInstanceSummary;
type: string;
width?: number;
};
diff --git a/airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx
b/airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx
index 4ecdc6fc3c2..ae5df5b5a97 100644
--- a/airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx
+++ b/airflow-core/src/airflow/ui/src/components/TaskInstanceTooltip.tsx
@@ -20,16 +20,16 @@ import { Box, Text } from "@chakra-ui/react";
import { useTranslation } from "react-i18next";
import type {
+ LightGridTaskInstanceSummary,
TaskInstanceHistoryResponse,
TaskInstanceResponse,
- GridTaskInstanceSummary,
} from "openapi/requests/types.gen";
import Time from "src/components/Time";
import { Tooltip, type TooltipProps } from "src/components/ui";
import { getDuration } from "src/utils";
type Props = {
- readonly taskInstance?: GridTaskInstanceSummary |
TaskInstanceHistoryResponse | TaskInstanceResponse;
+ readonly taskInstance?: LightGridTaskInstanceSummary |
TaskInstanceHistoryResponse | TaskInstanceResponse;
} & Omit<TooltipProps, "content">;
const TaskInstanceTooltip = ({ children, positioning, taskInstance, ...rest }:
Props) => {
@@ -50,21 +50,23 @@ const TaskInstanceTooltip = ({ children, positioning,
taskInstance, ...rest }: P
{translate("runId")}: {taskInstance.dag_run_id}
</Text>
) : undefined}
- <Text>
- {translate("startDate")}: <Time datetime={taskInstance.start_date}
/>
- </Text>
- <Text>
- {translate("endDate")}: <Time datetime={taskInstance.end_date} />
- </Text>
- {taskInstance.try_number > 1 && (
- <Text>
- {translate("tryNumber")}: {taskInstance.try_number}
- </Text>
- )}
{"start_date" in taskInstance ? (
- <Text>
- {translate("duration")}: {getDuration(taskInstance.start_date,
taskInstance.end_date)}
- </Text>
+ <>
+ {taskInstance.try_number > 1 && (
+ <Text>
+ {translate("tryNumber")}: {taskInstance.try_number}
+ </Text>
+ )}
+ <Text>
+ {translate("startDate")}: <Time
datetime={taskInstance.start_date} />
+ </Text>
+ <Text>
+ {translate("endDate")}: <Time datetime={taskInstance.end_date}
/>
+ </Text>
+ <Text>
+ {translate("duration")}: {getDuration(taskInstance.start_date,
taskInstance.end_date)}
+ </Text>
+ </>
) : undefined}
</Box>
}
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/DetailsLayout.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/DetailsLayout.tsx
index a1dba8f668a..887f2b7406c 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/DetailsLayout.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/DetailsLayout.tsx
@@ -28,7 +28,6 @@ import { Outlet, useParams } from "react-router-dom";
import { useLocalStorage } from "usehooks-ts";
import { useDagServiceGetDag, useDagWarningServiceListDagWarnings } from
"openapi/queries";
-import type { DAGResponse } from "openapi/requests/types.gen";
import BackfillBanner from "src/components/Banner/BackfillBanner";
import { SearchDagsButton } from "src/components/SearchDags";
import TriggerDAGButton from "src/components/TriggerDag/TriggerDAGButton";
@@ -46,7 +45,6 @@ import { NavTabs } from "./NavTabs";
import { PanelButtons } from "./PanelButtons";
type Props = {
- readonly dag?: DAGResponse;
readonly error?: unknown;
readonly isLoading?: boolean;
readonly tabs: Array<{ icon: ReactNode; label: string; value: string }>;
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
index 4837e18d401..2d24b95a408 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Graph/Graph.tsx
@@ -22,11 +22,7 @@ import "@xyflow/react/dist/style.css";
import { useParams } from "react-router-dom";
import { useLocalStorage } from "usehooks-ts";
-import {
- useDagRunServiceGetDagRun,
- useGridServiceGridData,
- useStructureServiceStructureData,
-} from "openapi/queries";
+import { useStructureServiceStructureData } from "openapi/queries";
import { DownloadButton } from "src/components/Graph/DownloadButton";
import { edgeTypes, nodeTypes } from "src/components/Graph/graphTypes";
import type { CustomNodeProps } from "src/components/Graph/reactflowUtils";
@@ -35,7 +31,7 @@ import { useColorMode } from "src/context/colorMode";
import { useOpenGroups } from "src/context/openGroups";
import useSelectedVersion from "src/hooks/useSelectedVersion";
import { useDependencyGraph } from "src/queries/useDependencyGraph";
-import { isStatePending, useAutoRefresh } from "src/utils";
+import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
const nodeColor = (
{ data: { depth, height, isOpen, taskInstance, width }, type }:
ReactFlowNode<CustomNodeProps>,
@@ -76,7 +72,6 @@ export const Graph = () => {
]);
const { openGroupIds } = useOpenGroups();
- const refetchInterval = useAutoRefresh({ dagId });
const [dependencies] = useLocalStorage<"all" | "immediate" |
"tasks">(`dependencies-${dagId}`, "tasks");
const [direction] = useLocalStorage<Direction>(`direction-${dagId}`,
"RIGHT");
@@ -93,15 +88,6 @@ export const Graph = () => {
enabled: dependencies === "all",
});
- const { data: dagRun } = useDagRunServiceGetDagRun(
- {
- dagId,
- dagRunId: runId,
- },
- undefined,
- { enabled: runId !== "" },
- );
-
const dagDepEdges = dependencies === "all" ? dagDependencies.edges : [];
const dagDepNodes = dependencies === "all" ? dagDependencies.nodes : [];
@@ -117,28 +103,11 @@ export const Graph = () => {
versionNumber: selectedVersion,
});
- // Filter grid data to get only a single dag run
- const { data: gridData } = useGridServiceGridData(
- {
- dagId,
- limit: 1,
- offset: 0,
- runAfterGte: dagRun?.run_after,
- runAfterLte: dagRun?.run_after,
- },
- undefined,
- {
- enabled: dagRun !== undefined,
- refetchInterval: (query) =>
- query.state.data?.dag_runs.some((dr) => isStatePending(dr.state)) &&
refetchInterval,
- },
- );
-
- const gridRun = gridData?.dag_runs.find((dr) => dr.dag_run_id === runId);
+ const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
// Add task instances to the node data but without having to recalculate how
the graph is laid out
const nodes = data?.nodes.map((node) => {
- const taskInstance = gridRun?.task_instances.find((ti) => ti.task_id ===
node.id);
+ const taskInstance = gridTISummaries?.task_instances.find((ti) =>
ti.task_id === node.id);
return {
...node,
diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts
b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts
index d278597267a..6901287d04c 100644
--- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts
+++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts
@@ -16,11 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import type { GridDAGRunwithTIs, GridNodeResponse } from
"openapi/requests/types.gen";
-
-export type RunWithDuration = {
- duration: number;
-} & GridDAGRunwithTIs;
+import type { GridNodeResponse } from "openapi/requests/types.gen";
export type GridTask = {
depth: number;
diff --git
a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
index 212e9d6a187..7f5f212469d 100644
--- a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx
@@ -30,7 +30,6 @@ import {
useDagServiceGetDagDetails,
useDagServiceGetDagsUiKey,
useDependenciesServiceGetDependencies,
- UseGridServiceGridDataKeyFn,
UseTaskInstanceServiceGetTaskInstancesKeyFn,
} from "openapi/queries";
import type {
@@ -105,7 +104,6 @@ export const CreateAssetEventModal = ({ asset, onClose,
open }: Props) => {
[useDagServiceGetDagsUiKey],
UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]),
UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" },
[{ dagId, dagRunId: "~" }]),
- UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
];
toaster.create({
diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx
b/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx
index 877f57ff795..dbc9b84e12e 100644
--- a/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx
@@ -98,12 +98,7 @@ export const Dag = () => {
return (
<ReactFlowProvider>
- <DetailsLayout
- dag={dag}
- error={error ?? runsError}
- isLoading={isLoading || isLoadingRuns}
- tabs={displayTabs}
- >
+ <DetailsLayout error={error ?? runsError} isLoading={isLoading ||
isLoadingRuns} tabs={displayTabs}>
<Header
dag={dag}
dagWithRuns={dagWithRuns}
diff --git a/airflow-core/src/airflow/ui/src/pages/Run/Run.tsx
b/airflow-core/src/airflow/ui/src/pages/Run/Run.tsx
index c9326497b3f..776e4e379cd 100644
--- a/airflow-core/src/airflow/ui/src/pages/Run/Run.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Run/Run.tsx
@@ -22,7 +22,7 @@ import { FiCode, FiDatabase } from "react-icons/fi";
import { MdDetails, MdOutlineEventNote, MdOutlineTask } from "react-icons/md";
import { useParams } from "react-router-dom";
-import { useDagRunServiceGetDagRun, useDagServiceGetDagDetails } from
"openapi/queries";
+import { useDagRunServiceGetDagRun } from "openapi/queries";
import { usePluginTabs } from "src/hooks/usePluginTabs";
import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
import { isStatePending, useAutoRefresh } from "src/utils";
@@ -47,14 +47,6 @@ export const Run = () => {
const refetchInterval = useAutoRefresh({ dagId });
- const {
- data: dag,
- error: dagError,
- isLoading: isLoadinDag,
- } = useDagServiceGetDagDetails({
- dagId,
- });
-
const {
data: dagRun,
error,
@@ -72,7 +64,7 @@ export const Run = () => {
return (
<ReactFlowProvider>
- <DetailsLayout dag={dag} error={error ?? dagError} isLoading={isLoading
|| isLoadinDag} tabs={tabs}>
+ <DetailsLayout error={error} isLoading={isLoading} tabs={tabs}>
{dagRun === undefined ? undefined : (
<Header
dagRun={dagRun}
diff --git a/airflow-core/src/airflow/ui/src/pages/Task/Task.tsx
b/airflow-core/src/airflow/ui/src/pages/Task/Task.tsx
index c832c620434..d37e91fe9a9 100644
--- a/airflow-core/src/airflow/ui/src/pages/Task/Task.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Task/Task.tsx
@@ -22,7 +22,7 @@ import { LuChartColumn } from "react-icons/lu";
import { MdOutlineEventNote, MdOutlineTask } from "react-icons/md";
import { useParams } from "react-router-dom";
-import { useDagServiceGetDagDetails, useTaskServiceGetTask } from
"openapi/queries";
+import { useTaskServiceGetTask } from "openapi/queries";
import { usePluginTabs } from "src/hooks/usePluginTabs";
import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
import { useGridStructure } from "src/queries/useGridStructure.ts";
@@ -59,22 +59,9 @@ export const Task = () => {
const groupTask = getGroupTask(dagStructure, groupId);
- const {
- data: dag,
- error: dagError,
- isLoading: isDagLoading,
- } = useDagServiceGetDagDetails({
- dagId,
- });
-
return (
<ReactFlowProvider>
- <DetailsLayout
- dag={dag}
- error={error ?? dagError}
- isLoading={isLoading || isDagLoading}
- tabs={displayTabs}
- >
+ <DetailsLayout error={error} isLoading={isLoading} tabs={displayTabs}>
{task === undefined ? undefined : <Header task={task} />}
{groupTask ? <GroupTaskHeader title={groupTask.label} /> : undefined}
</DetailsLayout>
diff --git
a/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
b/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
index 83c6ae7e47d..f1a7c490d28 100644
--- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx
@@ -17,19 +17,17 @@
* under the License.
*/
import { ReactFlowProvider } from "@xyflow/react";
+import { useMemo } from "react";
import { useTranslation } from "react-i18next";
import { FiCode, FiDatabase } from "react-icons/fi";
import { MdDetails, MdOutlineEventNote, MdOutlineTask, MdReorder, MdSyncAlt }
from "react-icons/md";
import { PiBracketsCurlyBold } from "react-icons/pi";
import { useParams } from "react-router-dom";
-import {
- useDagServiceGetDagDetails,
- useGridServiceGridData,
- useTaskInstanceServiceGetMappedTaskInstance,
-} from "openapi/queries";
+import { useTaskInstanceServiceGetMappedTaskInstance } from "openapi/queries";
import { usePluginTabs } from "src/hooks/usePluginTabs";
import { DetailsLayout } from "src/layouts/Details/DetailsLayout";
+import { useGridTiSummaries } from "src/queries/useGridTISummaries.ts";
import { isStatePending, useAutoRefresh } from "src/utils";
import { Header } from "./Header";
@@ -58,14 +56,6 @@ export const TaskInstance = () => {
const refetchInterval = useAutoRefresh({ dagId });
- const {
- data: dag,
- error: dagError,
- isLoading: isDagLoading,
- } = useDagServiceGetDagDetails({
- dagId,
- });
-
const {
data: taskInstance,
error,
@@ -83,25 +73,18 @@ export const TaskInstance = () => {
},
);
- // Filter grid data to get only a single dag run
- const { data } = useGridServiceGridData(
- {
- dagId,
- limit: 1,
- offset: 0,
- runAfterGte: taskInstance?.run_after,
- runAfterLte: taskInstance?.run_after,
- },
- undefined,
- {
- enabled: taskInstance !== undefined,
- },
- );
-
- const mappedTaskInstance = data?.dag_runs
- .find((dr) => dr.dag_run_id === runId)
- ?.task_instances.find((ti) => ti.task_id === taskId);
+ const { data: gridTISummaries } = useGridTiSummaries({ dagId, runId });
+ const taskInstanceSummary = gridTISummaries?.task_instances.find((ti) =>
ti.task_id === taskId);
+ const taskCount = useMemo(
+ () =>
+ Array.isArray(taskInstanceSummary?.child_states)
+ ? taskInstanceSummary.child_states
+ .map((_state: string, count: number) => count)
+ .reduce((acc: number, val: unknown) => acc + (typeof val ===
"number" ? val : 0), 0)
+ : 0,
+ [taskInstanceSummary],
+ );
let newTabs = tabs;
if (taskInstance && taskInstance.map_index > -1) {
@@ -110,7 +93,7 @@ export const TaskInstance = () => {
{
icon: <MdOutlineTask />,
label: translate("tabs.mappedTaskInstances_other", {
- count: Number(mappedTaskInstance?.task_count ?? 0),
+ count: Number(taskCount),
}),
value: "task_instances",
},
@@ -120,7 +103,7 @@ export const TaskInstance = () => {
return (
<ReactFlowProvider>
- <DetailsLayout dag={dag} error={error ?? dagError} isLoading={isLoading
|| isDagLoading} tabs={newTabs}>
+ <DetailsLayout error={error} isLoading={isLoading} tabs={newTabs}>
{taskInstance === undefined ? undefined : (
<Header
isRefreshing={Boolean(isStatePending(taskInstance.state) &&
Boolean(refetchInterval))}
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
index 921555f8dea..9f5d4c267d7 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts
@@ -24,7 +24,6 @@ import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
UseDagServiceGetDagDetailsKeyFn,
- UseGridServiceGridDataKeyFn,
useTaskInstanceServiceGetTaskInstancesKey,
UseGridServiceGetGridRunsKeyFn,
UseGridServiceGetGridTiSummariesKeyFn,
@@ -60,7 +59,6 @@ export const useClearDagRun = ({
UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
[useDagRunServiceGetDagRunsKey],
[useClearDagRunDryRunKey, dagId],
- UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{
dagId, runId: dagRunId }]),
];
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
index 7940b4204ee..ed3431bba08 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
@@ -22,7 +22,6 @@ import { useTranslation } from "react-i18next";
import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
- UseGridServiceGridDataKeyFn,
UseTaskInstanceServiceGetMappedTaskInstanceKeyFn,
useTaskInstanceServicePostClearTaskInstances,
UseGridServiceGetGridRunsKeyFn,
@@ -85,7 +84,6 @@ export const useClearTaskInstances = ({
[useDagRunServiceGetDagRunsKey],
[useClearTaskInstancesDryRunKey, dagId],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId],
- UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{
dagId, runId: dagRunId }]),
];
diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
b/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
index 7f5c9e670ea..8e83952815c 100644
--- a/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
@@ -23,7 +23,6 @@ import {
useDagRunServiceDeleteDagRun,
useDagRunServiceGetDagRunsKey,
UseDagRunServiceGetDagRunKeyFn,
- UseGridServiceGridDataKeyFn,
} from "openapi/queries";
import { toaster } from "src/components/ui";
@@ -46,11 +45,7 @@ export const useDeleteDagRun = ({ dagId, dagRunId,
onSuccessConfirm }: DeleteDag
};
const onSuccess = async () => {
- const queryKeys = [
- UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
- [useDagRunServiceGetDagRunsKey],
- UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
- ];
+ const queryKeys = [UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
[useDagRunServiceGetDagRunsKey]];
await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })));
diff --git a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
index cf84f7cc068..d2ec715a011 100644
--- a/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useDeleteTaskInstance.ts
@@ -23,7 +23,6 @@ import {
useTaskInstanceServiceDeleteTaskInstance,
useTaskInstanceServiceGetTaskInstanceKey,
useTaskInstanceServiceGetTaskInstancesKey,
- UseGridServiceGridDataKeyFn,
useDagRunServiceGetDagRunsKey,
UseDagRunServiceGetDagRunKeyFn,
} from "openapi/queries";
@@ -61,7 +60,6 @@ export const useDeleteTaskInstance = ({
[useDagRunServiceGetDagRunsKey],
[useTaskInstanceServiceGetTaskInstancesKey],
[useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex,
taskId }],
- UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
];
await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({
queryKey: key })));
diff --git a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
index 2a93101875e..877719a0ab7 100644
--- a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
@@ -38,6 +38,7 @@ export const useGridTiSummaries = ({
},
undefined,
{
+ enabled: Boolean(runId) && Boolean(dagId),
placeholderData: (prev) => prev,
refetchInterval: (query) =>
((state !== undefined && isStatePending(state)) ||
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
index a5c2377bca7..25ea7fad646 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
@@ -23,7 +23,6 @@ import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
useDagRunServicePatchDagRun,
- UseGridServiceGridDataKeyFn,
useTaskInstanceServiceGetTaskInstancesKey,
UseGridServiceGetGridRunsKeyFn,
UseGridServiceGetGridTiSummariesKeyFn,
@@ -60,7 +59,6 @@ export const usePatchDagRun = ({
[useDagRunServiceGetDagRunsKey],
[useTaskInstanceServiceGetTaskInstancesKey, { dagId, dagRunId }],
[useClearDagRunDryRunKey, dagId],
- UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{
dagId, runId: dagRunId }]),
];
diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
index 1a08af5f7e8..bc2747b1a57 100644
--- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
+++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts
@@ -20,7 +20,6 @@ import { useQueryClient } from "@tanstack/react-query";
import { useTranslation } from "react-i18next";
import {
- UseGridServiceGridDataKeyFn,
UseTaskInstanceServiceGetMappedTaskInstanceKeyFn,
UseTaskInstanceServiceGetTaskInstanceKeyFn,
useTaskInstanceServiceGetTaskInstancesKey,
@@ -66,7 +65,6 @@ export const usePatchTaskInstance = ({
[useTaskInstanceServiceGetTaskInstancesKey],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
[useClearTaskInstancesDryRunKey, dagId],
- UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{
dagId, runId: dagRunId }]),
];
diff --git a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
index 9f67c249d03..d47f3a6e35b 100644
--- a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts
@@ -24,7 +24,6 @@ import {
UseDagRunServiceGetDagRunsKeyFn,
UseDagServiceGetDagDetailsKeyFn,
useDagServiceGetDagsUi,
- UseGridServiceGridDataKeyFn,
UseTaskInstanceServiceGetTaskInstancesKeyFn,
useGridServiceGetLatestRun,
UseGridServiceGetDagStructureKeyFn,
@@ -57,7 +56,6 @@ export const useRefreshOnNewDagRuns = (dagId: string,
hasPendingRuns: boolean |
UseDagServiceGetDagDetailsKeyFn({ dagId }, [{ dagId }]),
UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]),
UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" },
[{ dagId, dagRunId: "~" }]),
- UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetDagStructureKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
];
diff --git a/airflow-core/src/airflow/ui/src/queries/useTrigger.ts
b/airflow-core/src/airflow/ui/src/queries/useTrigger.ts
index 99f0a381c0e..307ea033da6 100644
--- a/airflow-core/src/airflow/ui/src/queries/useTrigger.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useTrigger.ts
@@ -24,7 +24,6 @@ import {
UseDagRunServiceGetDagRunsKeyFn,
useDagRunServiceTriggerDagRun,
useDagServiceGetDagsUiKey,
- UseGridServiceGridDataKeyFn,
UseTaskInstanceServiceGetTaskInstancesKeyFn,
UseGridServiceGetGridRunsKeyFn,
} from "openapi/queries";
@@ -40,7 +39,6 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: {
dagId: string; onSucce
[useDagServiceGetDagsUiKey],
UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]),
UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{
dagId, dagRunId: "~" }]),
- UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]),
UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]),
];
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index cf2e561ec30..314c6bf8437 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -56,368 +56,56 @@ INNER_TASK_GROUP = "inner_task_group"
INNER_TASK_GROUP_SUB_TASK = "inner_task_group_sub_task"
GRID_RUN_1 = {
- "dag_run_id": "run_1",
- "data_interval_end": "2024-11-30T00:00:00Z",
- "data_interval_start": "2024-11-29T00:00:00Z",
+ "dag_id": "test_dag",
+ "duration": 0,
"end_date": "2024-12-31T00:00:00Z",
- "logical_date": "2024-11-30T00:00:00Z",
"run_after": "2024-11-30T00:00:00Z",
+ "run_id": "run_1",
"run_type": "scheduled",
"start_date": "2016-01-01T00:00:00Z",
"state": "success",
- "task_instances": [
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 3,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 3,
- "task_id": "mapped_task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 2,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 2,
- "task_id": "task_group.inner_task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 5,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 5,
- "task_id": "task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 1,
- "task_id": "mapped_task_2",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 3,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 3,
- "task_id": "mapped_task_group.subtask",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 1,
- "task_id": "task",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 2,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 2,
- "task_id": "task_group.inner_task_group.inner_task_group_sub_task",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 4,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 4,
- "task_id": "task_group.mapped_task",
- "try_number": 0,
- },
- ],
}
GRID_RUN_2 = {
- "dag_run_id": "run_2",
- "data_interval_end": "2024-11-30T00:00:00Z",
- "data_interval_start": "2024-11-29T00:00:00Z",
+ "dag_id": "test_dag",
+ "duration": 0,
"end_date": "2024-12-31T00:00:00Z",
- "logical_date": "2024-12-01T00:00:00Z",
"run_after": "2024-11-30T00:00:00Z",
+ "run_id": "run_2",
"run_type": "manual",
"start_date": "2016-01-01T00:00:00Z",
"state": "failed",
- "task_instances": [
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 1,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 1,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "end_date": "2024-12-30T01:02:03Z",
- "start_date": "2024-12-30T01:00:00Z",
- "state": "running",
- "task_count": 3,
- "task_id": "mapped_task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 2,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 0,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "task_count": 2,
- "task_id": "task_group.inner_task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 5,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 0,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "task_count": 5,
- "task_id": "task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 1,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 0,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "task_count": 1,
- "task_id": "mapped_task_2",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 1,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 1,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "end_date": "2024-12-30T01:02:03Z",
- "start_date": "2024-12-30T01:00:00Z",
- "state": "running",
- "task_count": 3,
- "task_id": "mapped_task_group.subtask",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 1,
- "task_id": "task",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 2,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 0,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "task_count": 2,
- "task_id": "task_group.inner_task_group.inner_task_group_sub_task",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 4,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 0,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "task_count": 4,
- "task_id": "task_group.mapped_task",
- "try_number": 0,
- },
- ],
}
+GRID_NODES = [
+ {
+ "children": [{"id": "mapped_task_group.subtask", "is_mapped": True,
"label": "subtask"}],
+ "id": "mapped_task_group",
+ "is_mapped": True,
+ "label": "mapped_task_group",
+ },
+ {"id": "task", "label": "task"},
+ {
+ "children": [
+ {
+ "children": [
+ {
+ "id":
"task_group.inner_task_group.inner_task_group_sub_task",
+ "is_mapped": True,
+ "label": "inner_task_group_sub_task",
+ }
+ ],
+ "id": "task_group.inner_task_group",
+ "label": "inner_task_group",
+ },
+ {"id": "task_group.mapped_task", "is_mapped": True, "label":
"mapped_task"},
+ ],
+ "id": "task_group",
+ "label": "task_group",
+ },
+ {"id": "mapped_task_2", "is_mapped": True, "label": "mapped_task_2"},
+]
+
@pytest.fixture(autouse=True, scope="module")
def examples_dag_bag():
@@ -582,286 +270,48 @@ def _freeze_time_for_dagruns(time_machine):
@pytest.mark.usefixtures("_freeze_time_for_dagruns")
class TestGetGridDataEndpoint:
def test_should_response_200(self, test_client):
- response = test_client.get(f"/grid/{DAG_ID}")
+ response = test_client.get(f"/grid/runs/{DAG_ID}")
assert response.status_code == 200
- assert response.json() == {
- "dag_runs": [GRID_RUN_1, GRID_RUN_2],
- }
+ assert response.json() == [
+ GRID_RUN_1,
+ GRID_RUN_2,
+ ]
@pytest.mark.parametrize(
"order_by,expected",
[
(
"logical_date",
- {
- "dag_runs": [
- GRID_RUN_1,
- GRID_RUN_2,
- ],
- },
+ [
+ GRID_RUN_1,
+ GRID_RUN_2,
+ ],
),
(
"-logical_date",
- {
- "dag_runs": [
- GRID_RUN_2,
- GRID_RUN_1,
- ],
- },
+ [
+ GRID_RUN_2,
+ GRID_RUN_1,
+ ],
),
(
"run_after",
- {
- "dag_runs": [
- GRID_RUN_1,
- GRID_RUN_2,
- ],
- },
+ [
+ GRID_RUN_1,
+ GRID_RUN_2,
+ ],
),
(
"-run_after",
- {
- "dag_runs": [
- GRID_RUN_2,
- GRID_RUN_1,
- ],
- },
+ [
+ GRID_RUN_2,
+ GRID_RUN_1,
+ ],
),
],
)
def test_should_response_200_order_by(self, test_client, order_by,
expected):
- response = test_client.get(f"/grid/{DAG_ID}", params={"order_by":
order_by})
- assert response.status_code == 200
- assert response.json() == expected
-
- @pytest.mark.parametrize(
- "include_upstream, include_downstream, expected",
- [
- (
- "true",
- "false",
- {
- "dag_runs": [
- {
- **GRID_RUN_1,
- "task_instances": [
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 3,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 3,
- "task_id": "mapped_task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 3,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 3,
- "task_id": "mapped_task_group.subtask",
- "try_number": 0,
- },
- ],
- },
- {
- **GRID_RUN_2,
- "task_instances": [
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 1,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 1,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "end_date": "2024-12-30T01:02:03Z",
- "start_date": "2024-12-30T01:00:00Z",
- "state": "running",
- "task_count": 3,
- "task_id": "mapped_task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 1,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 1,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "end_date": "2024-12-30T01:02:03Z",
- "start_date": "2024-12-30T01:00:00Z",
- "state": "running",
- "task_count": 3,
- "task_id": "mapped_task_group.subtask",
- "try_number": 0,
- },
- ],
- },
- ],
- },
- ),
- (
- "false",
- "true",
- {
- "dag_runs": [
- {
- **GRID_RUN_1,
- "task_instances": [
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 3,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 3,
- "task_id": "mapped_task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 3,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 3,
- "task_id": "mapped_task_group.subtask",
- "try_number": 0,
- },
- ],
- },
- {
- **GRID_RUN_2,
- "task_instances": [
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 1,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 1,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "end_date": "2024-12-30T01:02:03Z",
- "start_date": "2024-12-30T01:00:00Z",
- "state": "running",
- "task_count": 3,
- "task_id": "mapped_task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 1,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 1,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "end_date": "2024-12-30T01:02:03Z",
- "start_date": "2024-12-30T01:00:00Z",
- "state": "running",
- "task_count": 3,
- "task_id": "mapped_task_group.subtask",
- "try_number": 0,
- },
- ],
- },
- ],
- },
- ),
- ],
- )
- def test_should_response_200_include_upstream_downstream(
- self, test_client, include_upstream, include_downstream, expected
- ):
- response = test_client.get(
- f"/grid/{DAG_ID}",
- params={
- "root": SUB_TASK_ID,
- "include_upstream": include_upstream,
- "include_downstream": include_downstream,
- },
- )
+ response = test_client.get(f"/grid/runs/{DAG_ID}", params={"order_by":
order_by})
assert response.status_code == 200
assert response.json() == expected
@@ -870,163 +320,98 @@ class TestGetGridDataEndpoint:
[
(
1,
- {
- "dag_runs": [GRID_RUN_1],
- },
+ [GRID_RUN_1],
),
(
2,
- {
- "dag_runs": [GRID_RUN_1, GRID_RUN_2],
- },
+ [GRID_RUN_1, GRID_RUN_2],
),
],
)
def test_should_response_200_limit(self, test_client, limit, expected):
- response = test_client.get(f"/grid/{DAG_ID}", params={"limit": limit})
+ response = test_client.get(f"/grid/runs/{DAG_ID}", params={"limit":
limit})
assert response.status_code == 200
assert response.json() == expected
@pytest.mark.parametrize(
"params, expected",
[
- (
- {
- "logical_date_gte": timezone.datetime(2024, 11, 30),
- "logical_date_lte": timezone.datetime(2024, 11, 30),
- },
- {
- "dag_runs": [GRID_RUN_1],
- },
- ),
- (
- {
- "logical_date_gte": timezone.datetime(2024, 10, 30),
- "logical_date_lte": timezone.datetime(2024, 10, 30),
- },
- {"dag_runs": []},
- ),
(
{
"run_after_gte": timezone.datetime(2024, 11, 30),
"run_after_lte": timezone.datetime(2024, 11, 30),
},
- {
- "dag_runs": [GRID_RUN_1, GRID_RUN_2],
- },
+ [GRID_RUN_1, GRID_RUN_2],
),
(
{
"run_after_gte": timezone.datetime(2024, 10, 30),
"run_after_lte": timezone.datetime(2024, 10, 30),
},
- {"dag_runs": []},
+ [],
),
],
)
- def test_should_response_200_date_filters(self, test_client, params,
expected):
+ def test_runs_should_response_200_date_filters(self, test_client, params,
expected):
response = test_client.get(
- f"/grid/{DAG_ID}",
+ f"/grid/runs/{DAG_ID}",
params=params,
)
assert response.status_code == 200
assert response.json() == expected
@pytest.mark.parametrize(
- "run_type, expected",
- [
- (
- ["manual"],
- {
- "dag_runs": [GRID_RUN_2],
- },
- ),
- (
- ["scheduled"],
- {
- "dag_runs": [GRID_RUN_1],
- },
- ),
- ],
- )
- def test_should_response_200_run_types(self, test_client, run_type,
expected):
- response = test_client.get(f"/grid/{DAG_ID}", params={"run_type":
run_type})
- assert response.status_code == 200
- assert response.json() == expected
-
- @pytest.mark.parametrize(
- "run_type, expected",
- [
- (
- ["invalid"],
- {"detail": f"Invalid value for run type. Valid values are {',
'.join(DagRunType)}"},
- )
- ],
- )
- def test_should_response_200_run_types_invalid(self, test_client,
run_type, expected):
- response = test_client.get(f"/grid/{DAG_ID}", params={"run_type":
run_type})
- assert response.status_code == 422
- assert response.json() == expected
-
- @pytest.mark.parametrize(
- "state, expected",
+ "params, expected",
[
(
- ["success"],
{
- "dag_runs": [GRID_RUN_1],
+ "run_after_gte": timezone.datetime(2024, 11, 30),
+ "run_after_lte": timezone.datetime(2024, 11, 30),
},
+ GRID_NODES,
),
(
- ["failed"],
{
- "dag_runs": [GRID_RUN_2],
+ "run_after_gte": timezone.datetime(2024, 10, 30),
+ "run_after_lte": timezone.datetime(2024, 10, 30),
},
- ),
- (
- ["running"],
- {"dag_runs": []},
+ GRID_NODES,
),
],
)
- def test_should_response_200_states(self, test_client, state, expected):
- response = test_client.get(f"/grid/{DAG_ID}", params={"state": state})
+ def test_structure_should_response_200_date_filters(self, test_client,
params, expected):
+ response = test_client.get(
+ f"/grid/structure/{DAG_ID}",
+ params=params,
+ )
assert response.status_code == 200
assert response.json() == expected
- @pytest.mark.parametrize(
- "state, expected",
- [
- (
- ["invalid"],
- {"detail": f"Invalid value for state. Valid values are {',
'.join(DagRunState)}"},
- )
- ],
- )
- def test_should_response_200_states_invalid(self, test_client, state,
expected):
- response = test_client.get(f"/grid/{DAG_ID}", params={"state": state})
- assert response.status_code == 422
- assert response.json() == expected
-
- def test_should_response_401(self, unauthenticated_test_client):
- response = unauthenticated_test_client.get(f"/grid/{DAG_ID_3}")
+ @pytest.mark.parametrize("endpoint", ["runs", "structure", "latest_run"])
+ def test_should_response_401(self, unauthenticated_test_client, endpoint):
+ response =
unauthenticated_test_client.get(f"/grid/{endpoint}/{DAG_ID_3}")
assert response.status_code == 401
- def test_should_response_403(self, unauthorized_test_client):
- response = unauthorized_test_client.get(f"/grid/{DAG_ID_3}")
+ @pytest.mark.parametrize("endpoint", ["runs", "structure", "latest_run"])
+ def test_should_response_403(self, unauthorized_test_client, endpoint):
+ response = unauthorized_test_client.get(f"/grid/{endpoint}/{DAG_ID_3}")
assert response.status_code == 403
- def test_should_response_404(self, test_client):
- response = test_client.get("/grid/invalid_dag")
+ @pytest.mark.parametrize("endpoint", ["runs", "structure"])
+ def test_should_response_404(self, test_client, endpoint):
+ response = test_client.get(f"/grid/{endpoint}/invalid_dag")
assert response.status_code == 404
assert response.json() == {"detail": "Dag with id invalid_dag was not
found"}
- def test_should_response_200_without_dag_run(self, test_client):
- response = test_client.get(f"/grid/{DAG_ID_2}")
+ def test_structure_should_response_200_without_dag_run(self, test_client):
+ response = test_client.get(f"/grid/structure/{DAG_ID_2}")
assert response.status_code == 200
- assert response.json() == {
- "dag_runs": [],
- }
+ assert response.json() == [{"id": "task2", "label": "task2"}]
+
+ def test_runs_should_response_200_without_dag_run(self, test_client):
+ response = test_client.get(f"/grid/runs/{DAG_ID_2}")
+ assert response.status_code == 200
+ assert response.json() == []
def test_should_response_200_with_deleted_task_and_taskgroup(self,
session, test_client):
# Mark one of the TI of the previous runs as "REMOVED" to simulate
clearing an older DagRun.
@@ -1038,121 +423,17 @@ class TestGetGridDataEndpoint:
ti.dag_version = session.scalar(select(DagModel).where(DagModel.dag_id
== DAG_ID_3)).dag_versions[-1]
session.commit()
- response = test_client.get(f"/grid/{DAG_ID_3}")
+ response = test_client.get(f"/grid/structure/{DAG_ID_3}")
assert response.status_code == 200
- assert response.json() == {
- "dag_runs": [
- {
- "dag_run_id": "run_3",
- "data_interval_end": "2024-11-30T00:00:00Z",
- "data_interval_start": "2024-11-29T00:00:00Z",
- "logical_date": "2024-11-30T00:00:00Z",
- "queued_at": "2024-12-31T00:00:00Z",
- "run_after": "2024-11-30T00:00:00Z",
- "run_type": "scheduled",
- "state": "queued",
- "task_instances": [
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 1,
- "task_id": "task_group",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 1,
- "task_id": "task3",
- "try_number": 0,
- },
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 1,
- "task_id": "task_group.inner_task",
- "try_number": 0,
- },
- ],
- },
- {
- "dag_run_id": "run_4",
- "data_interval_end": "2024-11-30T00:00:00Z",
- "data_interval_start": "2024-11-29T00:00:00Z",
- "end_date": "2024-12-31T00:00:00Z",
- "logical_date": "2024-12-01T00:00:00Z",
- "run_after": "2024-11-30T00:00:00Z",
- "run_type": "manual",
- "start_date": "2024-11-30T00:00:00Z",
- "state": "success",
- "task_instances": [
- {
- "child_states": {
- "deferred": 0,
- "failed": 0,
- "no_status": 0,
- "queued": 0,
- "removed": 0,
- "restarting": 0,
- "running": 0,
- "scheduled": 0,
- "skipped": 0,
- "success": 1,
- "up_for_reschedule": 0,
- "up_for_retry": 0,
- "upstream_failed": 0,
- },
- "state": "success",
- "task_count": 1,
- "task_id": "task3",
- "try_number": 0,
- },
- ],
- },
- ],
- }
+ assert response.json() == [
+ {"id": "task3", "label": "task3"},
+ {"id": "task4", "label": "task4"},
+ {
+ "children": [{"id": "task_group.inner_task", "label":
"inner_task"}],
+ "id": "task_group",
+ "label": "task_group",
+ },
+ ]
def test_get_dag_structure(self, session, test_client):
session.commit()