pierrejeambrun commented on code in PR #44332: URL: https://github.com/apache/airflow/pull/44332#discussion_r1886595870
########## airflow/api_fastapi/core_api/routes/ui/grid.py: ########## @@ -0,0 +1,229 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import collections +import itertools +from typing import Annotated + +from fastapi import Depends, HTTPException, Request, status +from sqlalchemy import select + +from airflow import DAG +from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.parameters import ( + OptionalDateTimeQuery, + QueryDagRunRunTypesFilter, + QueryDagRunStateFilter, + QueryIncludeDownstream, + QueryIncludeUpstream, + QueryLimit, + QueryOffset, + Range, + RangeFilter, + SortParam, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.ui.grid import ( + GridDAGRunwithTIs, + GridResponse, +) +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.services.ui.grid import ( + fill_task_instance_summaries, + get_child_task_map, + get_dag_run_sort_param, + get_task_group_map, +) +from airflow.models import DagRun, TaskInstance +from airflow.models.dagrun import DagRunNote +from airflow.models.taskinstance import TaskInstanceNote + +grid_router = AirflowRouter(prefix="/grid", tags=["Grid"]) + + +@grid_router.get( + "/{dag_id}", + include_in_schema=False, + responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), +) +def grid_data( + dag_id: str, + run_types: QueryDagRunRunTypesFilter, + run_states: QueryDagRunStateFilter, + session: SessionDep, + offset: QueryOffset, + request: Request, + limit: QueryLimit, + order_by: Annotated[ + SortParam, + Depends( + SortParam( + ["logical_date", "data_interval_start", "data_interval_end", "start_date", "end_date"], DagRun + ).dynamic_depends() + ), + ], + include_upstream: QueryIncludeUpstream = False, + include_downstream: QueryIncludeDownstream = False, + logical_date_gte: OptionalDateTimeQuery = None, + logical_date_lte: OptionalDateTimeQuery = None, + root: str | None = None, +) -> GridResponse: + """Return grid data.""" + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + if not dag: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") + + date_filter = RangeFilter( + Range(lower_bound=logical_date_gte, upper_bound=logical_date_lte), + attribute=DagRun.logical_date, + ) + # Retrieve, sort and encode the previous DAG Runs + base_query = ( + select( + DagRun.run_id, + DagRun.queued_at, + DagRun.start_date, + DagRun.end_date, + DagRun.state, + DagRun.run_type, + DagRun.data_interval_start, + DagRun.data_interval_end, + DagRun.dag_version_id.label("version_number"), + DagRunNote.content.label("note"), + ) + .join(DagRun.dag_run_note, isouter=True) + .select_from(DagRun) + .where(DagRun.dag_id == dag.dag_id) + ) + + dag_runs_select_filter, _ = paginated_select( + statement=base_query, + filters=[ + run_types, + run_states, + date_filter, + ], + order_by=get_dag_run_sort_param(dag=dag, request_order_by=order_by), + offset=offset, + limit=limit, + ) + + dag_runs = session.execute(dag_runs_select_filter) + + # Check if there are any DAG Runs with given criteria to eliminate unnecessary queries/errors + if not dag_runs: + return GridResponse(dag_runs=[]) + + # Retrieve, sort and encode the Task Instances + tis_of_dag_runs, _ = paginated_select( + statement=select( + TaskInstance.run_id, + TaskInstance.task_id, + TaskInstance.try_number, + TaskInstance.state, + TaskInstance.start_date, + TaskInstance.end_date, + TaskInstance.queued_dttm.label("queued_dttm"), + TaskInstanceNote.content.label("note"), Review Comment: Same here `select(TaskInstance)` should be enough. ########## airflow/api_fastapi/core_api/routes/ui/grid.py: ########## @@ -0,0 +1,229 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import collections +import itertools +from typing import Annotated + +from fastapi import Depends, HTTPException, Request, status +from sqlalchemy import select + +from airflow import DAG +from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.parameters import ( + OptionalDateTimeQuery, + QueryDagRunRunTypesFilter, + QueryDagRunStateFilter, + QueryIncludeDownstream, + QueryIncludeUpstream, + QueryLimit, + QueryOffset, + Range, + RangeFilter, + SortParam, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.ui.grid import ( + GridDAGRunwithTIs, + GridResponse, +) +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.services.ui.grid import ( + fill_task_instance_summaries, + get_child_task_map, + get_dag_run_sort_param, + get_task_group_map, +) +from airflow.models import DagRun, TaskInstance +from airflow.models.dagrun import DagRunNote +from airflow.models.taskinstance import TaskInstanceNote + +grid_router = AirflowRouter(prefix="/grid", tags=["Grid"]) + + +@grid_router.get( + "/{dag_id}", + include_in_schema=False, + responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), +) +def grid_data( + dag_id: str, + run_types: QueryDagRunRunTypesFilter, + run_states: QueryDagRunStateFilter, + session: SessionDep, + offset: QueryOffset, + request: Request, + limit: QueryLimit, + order_by: Annotated[ + SortParam, + Depends( + SortParam( + ["logical_date", "data_interval_start", "data_interval_end", "start_date", "end_date"], DagRun + ).dynamic_depends() + ), + ], + include_upstream: QueryIncludeUpstream = False, + include_downstream: QueryIncludeDownstream = False, + logical_date_gte: OptionalDateTimeQuery = None, + logical_date_lte: OptionalDateTimeQuery = None, + root: str | None = None, +) -> GridResponse: + """Return grid data.""" + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + if not dag: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") + + date_filter = RangeFilter( + Range(lower_bound=logical_date_gte, upper_bound=logical_date_lte), + attribute=DagRun.logical_date, + ) + # Retrieve, sort and encode the previous DAG Runs Review Comment: I don't think encoding is done here. ```suggestion # Retrieve, sort the previous DAG Runs ``` ########## airflow/api_fastapi/core_api/routes/ui/grid.py: ########## @@ -0,0 +1,229 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import collections +import itertools +from typing import Annotated + +from fastapi import Depends, HTTPException, Request, status +from sqlalchemy import select + +from airflow import DAG +from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.parameters import ( + OptionalDateTimeQuery, + QueryDagRunRunTypesFilter, + QueryDagRunStateFilter, + QueryIncludeDownstream, + QueryIncludeUpstream, + QueryLimit, + QueryOffset, + Range, + RangeFilter, + SortParam, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.ui.grid import ( + GridDAGRunwithTIs, + GridResponse, +) +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.services.ui.grid import ( + fill_task_instance_summaries, + get_child_task_map, + get_dag_run_sort_param, + get_task_group_map, +) +from airflow.models import DagRun, TaskInstance +from airflow.models.dagrun import DagRunNote +from airflow.models.taskinstance import TaskInstanceNote + +grid_router = AirflowRouter(prefix="/grid", tags=["Grid"]) + + +@grid_router.get( + "/{dag_id}", + include_in_schema=False, Review Comment: This is not needed anymore, it has been included at the router level. ```suggestion ``` ########## tests/api_fastapi/core_api/routes/ui/test_grid.py: ########## Review Comment: We also need to test query parameters (filters, sorting, limit etc...) ########## airflow/api_fastapi/core_api/services/ui/grid.py: ########## @@ -0,0 +1,269 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import operator +from functools import cache + +from typing_extensions import Any + +from airflow import DAG +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.api_fastapi.common.parameters import ( + SortParam, +) +from airflow.api_fastapi.core_api.datamodels.ui.grid import ( + GridTaskInstanceSummary, +) +from airflow.configuration import conf +from airflow.exceptions import AirflowConfigException +from airflow.models import DagRun, MappedOperator +from airflow.models.baseoperator import BaseOperator +from airflow.models.taskmap import TaskMap +from airflow.utils.state import TaskInstanceState +from airflow.utils.task_group import MappedTaskGroup, TaskGroup + + +def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam: + """ + Get the Sort Param for the DAG Run. + + Data interval columns are NULL for runs created before 2.3, but SQL's + NULL-sorting logic would make those old runs always appear first. In a + perfect world we'd want to sort by ``get_run_data_interval()``, but that's + not efficient, so instead if the run_ordering is data_interval_start or data_interval_end, + we sort by logical_date instead. + + :param dag: DAG + :param request_order_by: Request Order By + + :return: Sort Param + """ + if request_order_by and request_order_by.value != request_order_by.get_primary_key_string(): Review Comment: ```python if `request_order_by` ``` How can this evaluate to false ? Is this check necessary ? ########## airflow/api_fastapi/core_api/services/ui/grid.py: ########## @@ -0,0 +1,269 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import operator +from functools import cache + +from typing_extensions import Any + +from airflow import DAG +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.api_fastapi.common.parameters import ( + SortParam, +) +from airflow.api_fastapi.core_api.datamodels.ui.grid import ( + GridTaskInstanceSummary, +) +from airflow.configuration import conf +from airflow.exceptions import AirflowConfigException +from airflow.models import DagRun, MappedOperator +from airflow.models.baseoperator import BaseOperator +from airflow.models.taskmap import TaskMap +from airflow.utils.state import TaskInstanceState +from airflow.utils.task_group import MappedTaskGroup, TaskGroup + + +def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam: Review Comment: This function needs refactoring I think. The logic is obfuscated, 'if no order_by param is specified then we default to the fist field of `timetable.run_ordering`' ########## tests/api_fastapi/core_api/routes/ui/test_grid.py: ########## @@ -0,0 +1,1194 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from datetime import timedelta + +import pendulum +import pytest + +from airflow.decorators import task_group +from airflow.models import DagBag +from airflow.operators.empty import EmptyOperator +from airflow.utils import timezone +from airflow.utils.session import provide_session +from airflow.utils.state import DagRunState, TaskInstanceState +from airflow.utils.task_group import TaskGroup +from airflow.utils.types import DagRunTriggeredByType, DagRunType + +from tests_common.test_utils.db import clear_db_assets, clear_db_dags, clear_db_runs, clear_db_serialized_dags +from tests_common.test_utils.mock_operators import MockOperator + +pytestmark = pytest.mark.db_test + +DAG_ID = "test_dag" +DAG_ID_2 = "test_dag_2" +TASK_ID = "task" +TASK_ID_2 = "task2" +SUB_TASK_ID = "subtask" +MAPPED_TASK_ID = "mapped_task" +TASK_GROUP_ID = "task_group" +INNER_TASK_GROUP = "inner_task_group" +INNER_TASK_GROUP_SUB_TASK = "inner_task_group_sub_task" + + [email protected](autouse=True, scope="module") +def examples_dag_bag(): + # Speed up: We don't want example dags for this module + return DagBag(include_examples=False, read_dags_from_db=True) + + [email protected](autouse=True) +@provide_session +def setup(dag_maker, session=None): + clear_db_runs() + clear_db_dags() + clear_db_serialized_dags() + + with dag_maker(dag_id=DAG_ID, serialized=True, session=session) as dag: + EmptyOperator(task_id=TASK_ID) + + @task_group + def mapped_task_group(arg1): + return MockOperator(task_id=SUB_TASK_ID, arg1=arg1) + + mapped_task_group.expand(arg1=["a", "b", "c"]) + with TaskGroup(group_id=TASK_GROUP_ID): + MockOperator.partial(task_id=MAPPED_TASK_ID).expand(arg1=["a", "b", "c", "d"]) + with TaskGroup(group_id=INNER_TASK_GROUP): + MockOperator.partial(task_id=INNER_TASK_GROUP_SUB_TASK).expand(arg1=["a", "b"]) + + triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} + logical_date = timezone.datetime(2024, 11, 30) + + data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) + run_1 = dag_maker.create_dagrun( + run_id="run_1", + state=DagRunState.SUCCESS, + run_type=DagRunType.SCHEDULED, + logical_date=logical_date, + data_interval=data_interval, + **triggered_by_kwargs, + ) + run_2 = dag_maker.create_dagrun( + run_id="run_2", + run_type=DagRunType.MANUAL, + state=DagRunState.FAILED, + logical_date=logical_date + timedelta(days=1), + data_interval=data_interval, + **triggered_by_kwargs, + ) + for ti in run_1.task_instances: + ti.state = TaskInstanceState.SUCCESS + for ti in sorted(run_2.task_instances, key=lambda ti: (ti.task_id, ti.map_index)): + if ti.task_id == TASK_ID: + ti.state = TaskInstanceState.SUCCESS + elif ti.task_id == "group.mapped": + if ti.map_index == 0: + ti.state = TaskInstanceState.SUCCESS + ti.start_date = pendulum.DateTime(2024, 12, 30, 1, 0, 0, tzinfo=pendulum.UTC) + ti.end_date = pendulum.DateTime(2024, 12, 30, 1, 2, 3, tzinfo=pendulum.UTC) + elif ti.map_index == 1: + ti.state = TaskInstanceState.RUNNING + ti.start_date = pendulum.DateTime(2024, 12, 30, 2, 3, 4, tzinfo=pendulum.UTC) + ti.end_date = None + + session.flush() + + with dag_maker(dag_id=DAG_ID_2, serialized=True, session=session): + EmptyOperator(task_id=TASK_ID_2) + + [email protected](autouse=True) +def _clean(): + clear_db_runs() + clear_db_assets() + yield + clear_db_runs() + clear_db_assets() + + +# Create this as a fixture so that it is applied before the `dag_with_runs` fixture is! [email protected](autouse=True) +def _freeze_time_for_dagruns(time_machine): + time_machine.move_to("2024-12-01T00:00:00+00:00", tick=False) + + [email protected]("_freeze_time_for_dagruns") +class TestGetGridDataEndpoint: + def test_should_response_200(self, test_client): + response = test_client.get(f"/ui/grid/{DAG_ID}") + assert response.status_code == 200 + print(response.json()) + assert response.json() == { + "dag_runs": [ + { + "data_interval_end": "2024-11-30T00:00:00Z", + "data_interval_start": "2024-11-29T00:00:00Z", + "end_date": "2024-12-01T00:00:00Z", + "note": None, + "queued_at": None, + "dag_run_id": "run_1", + "run_type": "scheduled", + "start_date": "2016-01-01T00:00:00Z", + "state": "success", + "task_instances": [ + { + "end_date": None, + "note": None, + "state": "success", + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 0, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 3, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 3, + "task_id": "mapped_task_group", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": "success", + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 0, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 2, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 2, + "task_id": "task_group.inner_task_group", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": "success", + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 0, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 5, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 5, + "task_id": "task_group", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": "success", + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 0, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 3, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 3, + "task_id": "mapped_task_group.subtask", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": "success", + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 0, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 1, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 1, + "task_id": "task", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": "success", + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 0, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 2, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 2, + "task_id": "task_group.inner_task_group.inner_task_group_sub_task", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": "success", + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 0, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 4, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 4, + "task_id": "task_group.mapped_task", + "try_number": 0, + }, + ], + "version_number": None, + }, + { + "data_interval_end": "2024-11-30T00:00:00Z", + "data_interval_start": "2024-11-29T00:00:00Z", + "end_date": "2024-12-01T00:00:00Z", + "note": None, + "queued_at": None, + "dag_run_id": "run_2", + "run_type": "manual", + "start_date": "2016-01-01T00:00:00Z", + "state": "failed", + "task_instances": [ + { + "end_date": None, + "note": None, + "state": None, + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 3, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 0, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 3, + "task_id": "mapped_task_group", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": None, + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 2, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 0, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 2, + "task_id": "task_group.inner_task_group", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": None, + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 5, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 0, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 5, + "task_id": "task_group", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": None, + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 3, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 0, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 3, + "task_id": "mapped_task_group.subtask", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": "success", + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 0, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 1, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 1, + "task_id": "task", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": None, + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 2, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 0, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 2, + "task_id": "task_group.inner_task_group.inner_task_group_sub_task", + "try_number": 0, + }, + { + "end_date": None, + "note": None, + "state": None, + "queued_dttm": None, + "start_date": None, + "child_states": { + "deferred": 0, + "failed": 0, + "no_status": 4, + "queued": 0, + "removed": 0, + "restarting": 0, + "running": 0, + "scheduled": 0, + "skipped": 0, + "success": 0, + "up_for_reschedule": 0, + "up_for_retry": 0, + "upstream_failed": 0, + }, + "task_count": 4, + "task_id": "task_group.mapped_task", + "try_number": 0, + }, + ], + "version_number": None, + }, + ], + } + + def test_should_response_200_order_by(self, test_client): + response = test_client.get(f"/ui/grid/{DAG_ID}?order_by=-logical_date") + assert response.status_code == 200 + print(response.json()) Review Comment: To remove (multiple of them) ########## airflow/api_fastapi/core_api/routes/ui/grid.py: ########## @@ -0,0 +1,229 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import collections +import itertools +from typing import Annotated + +from fastapi import Depends, HTTPException, Request, status +from sqlalchemy import select + +from airflow import DAG +from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.parameters import ( + OptionalDateTimeQuery, + QueryDagRunRunTypesFilter, + QueryDagRunStateFilter, + QueryIncludeDownstream, + QueryIncludeUpstream, + QueryLimit, + QueryOffset, + Range, + RangeFilter, + SortParam, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.ui.grid import ( + GridDAGRunwithTIs, + GridResponse, +) +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.services.ui.grid import ( + fill_task_instance_summaries, + get_child_task_map, + get_dag_run_sort_param, + get_task_group_map, +) +from airflow.models import DagRun, TaskInstance +from airflow.models.dagrun import DagRunNote +from airflow.models.taskinstance import TaskInstanceNote + +grid_router = AirflowRouter(prefix="/grid", tags=["Grid"]) + + +@grid_router.get( + "/{dag_id}", + include_in_schema=False, + responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), +) +def grid_data( + dag_id: str, + run_types: QueryDagRunRunTypesFilter, + run_states: QueryDagRunStateFilter, + session: SessionDep, + offset: QueryOffset, + request: Request, + limit: QueryLimit, + order_by: Annotated[ + SortParam, + Depends( + SortParam( + ["logical_date", "data_interval_start", "data_interval_end", "start_date", "end_date"], DagRun + ).dynamic_depends() + ), + ], + include_upstream: QueryIncludeUpstream = False, + include_downstream: QueryIncludeDownstream = False, + logical_date_gte: OptionalDateTimeQuery = None, + logical_date_lte: OptionalDateTimeQuery = None, + root: str | None = None, +) -> GridResponse: + """Return grid data.""" + dag: DAG = request.app.state.dag_bag.get_dag(dag_id) + if not dag: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") + + date_filter = RangeFilter( + Range(lower_bound=logical_date_gte, upper_bound=logical_date_lte), + attribute=DagRun.logical_date, + ) + # Retrieve, sort and encode the previous DAG Runs + base_query = ( + select( + DagRun.run_id, + DagRun.queued_at, + DagRun.start_date, + DagRun.end_date, + DagRun.state, + DagRun.run_type, + DagRun.data_interval_start, + DagRun.data_interval_end, + DagRun.dag_version_id.label("version_number"), + DagRunNote.content.label("note"), Review Comment: Why don't we just query a DagRun ? Why is it necessary to make an exhaustive list of specific columns ? Also the note content is an association_proxy on the DagRun model, `DagRun.note` should have it as well. ########## airflow/api_fastapi/core_api/services/ui/grid.py: ########## @@ -0,0 +1,269 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import operator +from functools import cache + +from typing_extensions import Any + +from airflow import DAG +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.api_fastapi.common.parameters import ( + SortParam, +) +from airflow.api_fastapi.core_api.datamodels.ui.grid import ( + GridTaskInstanceSummary, +) +from airflow.configuration import conf +from airflow.exceptions import AirflowConfigException +from airflow.models import DagRun, MappedOperator +from airflow.models.baseoperator import BaseOperator +from airflow.models.taskmap import TaskMap +from airflow.utils.state import TaskInstanceState +from airflow.utils.task_group import MappedTaskGroup, TaskGroup + + +def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam: + """ + Get the Sort Param for the DAG Run. + + Data interval columns are NULL for runs created before 2.3, but SQL's + NULL-sorting logic would make those old runs always appear first. In a + perfect world we'd want to sort by ``get_run_data_interval()``, but that's + not efficient, so instead if the run_ordering is data_interval_start or data_interval_end, + we sort by logical_date instead. + + :param dag: DAG + :param request_order_by: Request Order By + + :return: Sort Param + """ + if request_order_by and request_order_by.value != request_order_by.get_primary_key_string(): + return request_order_by + + sort_param = SortParam( + allowed_attrs=["logical_date", "data_interval_start", "data_interval_end"], model=DagRun + ) + + for name in dag.timetable.run_ordering: + if name in ("data_interval_start", "data_interval_end"): + return sort_param.set_value(name) + + return sort_param.set_value("logical_date") + + +@cache +def get_task_group_children_getter() -> operator.methodcaller: + """Get the Task Group Children Getter for the DAG.""" + sort_order = conf.get("webserver", "grid_view_sorting_order", fallback="topological") + if sort_order == "topological": + return operator.methodcaller("topological_sort") + if sort_order == "hierarchical_alphabetical": + return operator.methodcaller("hierarchical_alphabetical_sort") + raise AirflowConfigException(f"Unsupported grid_view_sorting_order: {sort_order}") + + +def get_task_group_map(dag: DAG) -> dict[str, dict[str, Any]]: + """ + Get the Task Group Map for the DAG. + + :param dag: DAG + + :return: Task Group Map + """ + task_nodes: dict[str, dict[str, Any]] = {} + + def _is_task_node_mapped_task_group(task_node: BaseOperator | MappedTaskGroup | TaskMap | None) -> bool: + """Check if the Task Node is a Mapped Task Group.""" + return type(task_node) is MappedTaskGroup + + def _append_child_task_count_to_parent( + child_task_count: int | MappedTaskGroup | TaskMap | MappedOperator | None, + parent_node: BaseOperator | MappedTaskGroup | TaskMap | None, + ): + """ + Append the Child Task Count to the Parent. + + This method should only be used for Mapped Models. + """ + if isinstance(parent_node, TaskGroup): + # Remove the regular task counted in parent_node + task_nodes[parent_node.node_id]["task_count"].append(-1) + # Add the mapped task to the parent_node + task_nodes[parent_node.node_id]["task_count"].append(child_task_count) + + def _fill_task_group_map( + task_node: BaseOperator | MappedTaskGroup | TaskMap | None, + parent_node: BaseOperator | MappedTaskGroup | TaskMap | None, + ): + """Recursively fill the Task Group Map.""" + if task_node is None: + return + if isinstance(task_node, MappedOperator): + task_nodes[task_node.node_id] = { + "is_group": False, + "parent_id": parent_node.node_id if parent_node else None, + "task_count": [task_node], + } + # Add the Task Count to the Parent Node because parent node is a Task Group + _append_child_task_count_to_parent(child_task_count=task_node, parent_node=parent_node) + return + elif isinstance(task_node, TaskGroup): + task_count = ( + task_node + if _is_task_node_mapped_task_group(task_node) + else len([child for child in get_task_group_children_getter()(task_node)]) + ) + task_nodes[task_node.node_id] = { + "is_group": True, + "parent_id": parent_node.node_id if parent_node else None, + "task_count": [task_count], + } + return [ + _fill_task_group_map(task_node=child, parent_node=task_node) + for child in get_task_group_children_getter()(task_node) + ] + elif isinstance(task_node, BaseOperator): + task_nodes[task_node.task_id] = { + "is_group": False, + "parent_id": parent_node.node_id if parent_node else None, + "task_count": task_nodes[parent_node.node_id]["task_count"] + if _is_task_node_mapped_task_group(parent_node) and parent_node + else [1], + } + # No Need to Add the Task Count to the Parent Node, these are already counted in Add the Parent + return + + for node in [child for child in get_task_group_children_getter()(dag.task_group)]: + _fill_task_group_map(task_node=node, parent_node=None) + + return task_nodes + + +def get_child_task_map(parent_task_id: str, task_node_map: dict[str, dict[str, Any]]): + """Get the Child Task Map for the Parent Task ID.""" + return [task_id for task_id, task_map in task_node_map.items() if task_map["parent_id"] == parent_task_id] + + +def _get_total_task_count( + run_id: str, task_count: list[int | MappedTaskGroup | MappedOperator], session: SessionDep +) -> int: + return sum( + node + if isinstance(node, int) + else ( + node.get_mapped_ti_count(run_id=run_id, session=session) + if isinstance(node, (MappedTaskGroup, MappedOperator)) + else node + ) + for node in task_count + ) + + +def fill_task_instance_summaries( + grouped_task_instances: dict[tuple[str, str], list], + task_instance_summaries_to_fill: dict[str, list], + task_node_map: dict[str, dict[str, Any]], + session: SessionDep, +) -> None: + """ + Fill the Task Instance Summaries for the Grouped Task Instances. + + :param grouped_task_instances: Grouped Task Instances + :param task_instance_summaries_to_fill: Task Instance Summaries to fill + :param task_node_map: Task Node Map + :param session: Session + + :return: None + """ + ## Additional logic to calculate the overall state and task count dict of states + priority: list[None | TaskInstanceState] = [ + TaskInstanceState.FAILED, + TaskInstanceState.UPSTREAM_FAILED, + TaskInstanceState.UP_FOR_RETRY, + TaskInstanceState.UP_FOR_RESCHEDULE, + TaskInstanceState.QUEUED, + TaskInstanceState.SCHEDULED, + TaskInstanceState.DEFERRED, + TaskInstanceState.RUNNING, + TaskInstanceState.RESTARTING, + None, + TaskInstanceState.SUCCESS, + TaskInstanceState.SKIPPED, + TaskInstanceState.REMOVED, + ] Review Comment: I think this needs to be taken out of the function def, I feel like this could be reused ########## airflow/api_fastapi/core_api/services/ui/grid.py: ########## @@ -0,0 +1,269 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import operator +from functools import cache + +from typing_extensions import Any + +from airflow import DAG +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.api_fastapi.common.parameters import ( + SortParam, +) +from airflow.api_fastapi.core_api.datamodels.ui.grid import ( + GridTaskInstanceSummary, +) +from airflow.configuration import conf +from airflow.exceptions import AirflowConfigException +from airflow.models import DagRun, MappedOperator +from airflow.models.baseoperator import BaseOperator +from airflow.models.taskmap import TaskMap +from airflow.utils.state import TaskInstanceState +from airflow.utils.task_group import MappedTaskGroup, TaskGroup + + +def get_dag_run_sort_param(dag: DAG, request_order_by: SortParam) -> SortParam: + """ + Get the Sort Param for the DAG Run. + + Data interval columns are NULL for runs created before 2.3, but SQL's + NULL-sorting logic would make those old runs always appear first. In a + perfect world we'd want to sort by ``get_run_data_interval()``, but that's + not efficient, so instead if the run_ordering is data_interval_start or data_interval_end, + we sort by logical_date instead. + + :param dag: DAG + :param request_order_by: Request Order By + + :return: Sort Param + """ + if request_order_by and request_order_by.value != request_order_by.get_primary_key_string(): + return request_order_by + + sort_param = SortParam( + allowed_attrs=["logical_date", "data_interval_start", "data_interval_end"], model=DagRun + ) + + for name in dag.timetable.run_ordering: + if name in ("data_interval_start", "data_interval_end"): + return sort_param.set_value(name) + + return sort_param.set_value("logical_date") + + +@cache +def get_task_group_children_getter() -> operator.methodcaller: + """Get the Task Group Children Getter for the DAG.""" + sort_order = conf.get("webserver", "grid_view_sorting_order", fallback="topological") + if sort_order == "topological": + return operator.methodcaller("topological_sort") + if sort_order == "hierarchical_alphabetical": + return operator.methodcaller("hierarchical_alphabetical_sort") + raise AirflowConfigException(f"Unsupported grid_view_sorting_order: {sort_order}") + + +def get_task_group_map(dag: DAG) -> dict[str, dict[str, Any]]: + """ + Get the Task Group Map for the DAG. + + :param dag: DAG + + :return: Task Group Map + """ + task_nodes: dict[str, dict[str, Any]] = {} + + def _is_task_node_mapped_task_group(task_node: BaseOperator | MappedTaskGroup | TaskMap | None) -> bool: + """Check if the Task Node is a Mapped Task Group.""" + return type(task_node) is MappedTaskGroup + + def _append_child_task_count_to_parent( + child_task_count: int | MappedTaskGroup | TaskMap | MappedOperator | None, + parent_node: BaseOperator | MappedTaskGroup | TaskMap | None, + ): + """ + Append the Child Task Count to the Parent. + + This method should only be used for Mapped Models. + """ + if isinstance(parent_node, TaskGroup): + # Remove the regular task counted in parent_node + task_nodes[parent_node.node_id]["task_count"].append(-1) + # Add the mapped task to the parent_node + task_nodes[parent_node.node_id]["task_count"].append(child_task_count) + + def _fill_task_group_map( + task_node: BaseOperator | MappedTaskGroup | TaskMap | None, + parent_node: BaseOperator | MappedTaskGroup | TaskMap | None, + ): + """Recursively fill the Task Group Map.""" + if task_node is None: + return + if isinstance(task_node, MappedOperator): + task_nodes[task_node.node_id] = { + "is_group": False, + "parent_id": parent_node.node_id if parent_node else None, + "task_count": [task_node], + } + # Add the Task Count to the Parent Node because parent node is a Task Group + _append_child_task_count_to_parent(child_task_count=task_node, parent_node=parent_node) + return + elif isinstance(task_node, TaskGroup): + task_count = ( + task_node + if _is_task_node_mapped_task_group(task_node) + else len([child for child in get_task_group_children_getter()(task_node)]) + ) + task_nodes[task_node.node_id] = { + "is_group": True, + "parent_id": parent_node.node_id if parent_node else None, + "task_count": [task_count], + } + return [ + _fill_task_group_map(task_node=child, parent_node=task_node) + for child in get_task_group_children_getter()(task_node) + ] + elif isinstance(task_node, BaseOperator): + task_nodes[task_node.task_id] = { + "is_group": False, + "parent_id": parent_node.node_id if parent_node else None, + "task_count": task_nodes[parent_node.node_id]["task_count"] + if _is_task_node_mapped_task_group(parent_node) and parent_node + else [1], + } + # No Need to Add the Task Count to the Parent Node, these are already counted in Add the Parent + return + + for node in [child for child in get_task_group_children_getter()(dag.task_group)]: + _fill_task_group_map(task_node=node, parent_node=None) + + return task_nodes + + +def get_child_task_map(parent_task_id: str, task_node_map: dict[str, dict[str, Any]]): + """Get the Child Task Map for the Parent Task ID.""" + return [task_id for task_id, task_map in task_node_map.items() if task_map["parent_id"] == parent_task_id] + + +def _get_total_task_count( + run_id: str, task_count: list[int | MappedTaskGroup | MappedOperator], session: SessionDep +) -> int: + return sum( + node + if isinstance(node, int) + else ( + node.get_mapped_ti_count(run_id=run_id, session=session) + if isinstance(node, (MappedTaskGroup, MappedOperator)) + else node + ) + for node in task_count + ) + + +def fill_task_instance_summaries( + grouped_task_instances: dict[tuple[str, str], list], + task_instance_summaries_to_fill: dict[str, list], + task_node_map: dict[str, dict[str, Any]], + session: SessionDep, +) -> None: + """ + Fill the Task Instance Summaries for the Grouped Task Instances. + + :param grouped_task_instances: Grouped Task Instances + :param task_instance_summaries_to_fill: Task Instance Summaries to fill + :param task_node_map: Task Node Map + :param session: Session + + :return: None + """ + ## Additional logic to calculate the overall state and task count dict of states + priority: list[None | TaskInstanceState] = [ + TaskInstanceState.FAILED, + TaskInstanceState.UPSTREAM_FAILED, + TaskInstanceState.UP_FOR_RETRY, + TaskInstanceState.UP_FOR_RESCHEDULE, + TaskInstanceState.QUEUED, + TaskInstanceState.SCHEDULED, + TaskInstanceState.DEFERRED, + TaskInstanceState.RUNNING, + TaskInstanceState.RESTARTING, + None, + TaskInstanceState.SUCCESS, + TaskInstanceState.SKIPPED, + TaskInstanceState.REMOVED, + ] + + overall_states: dict[tuple[str, str], str] = { + (task_id, run_id): next( + (str(state.value) for state in priority for ti in tis if state is not None and ti.state == state), + "no_status", + ) + for (task_id, run_id), tis in grouped_task_instances.items() + } + for (task_id, run_id), tis in grouped_task_instances.items(): + ti_try_number = max([ti.try_number for ti in tis]) + ti_start_date = min([ti.start_date for ti in tis if ti.start_date], default=None) + ti_end_date = max([ti.end_date for ti in tis if ti.end_date], default=None) + ti_queued_dttm = min([ti.queued_dttm for ti in tis if ti.queued_dttm], default=None) + ti_note = min([ti.note for ti in tis if ti.note], default=None) + + all_states = {"no_status" if state is None else state.name.lower(): 0 for state in priority} + # Update Task States for non-grouped tasks + all_states.update( + { + "no_status" if state is None else state.name.lower(): len( + [ti for ti in tis if ti.state == state] + if not task_node_map[task_id]["is_group"] + else [ + ti + for ti in tis + if ti.state == state and ti.task_id in get_child_task_map(task_id, task_node_map) + ] + ) + for state in priority + } + ) + # Update Nested Task Group + all_states.update( + { Review Comment: Why is all that not similar to the legacy implementation ? What are we trying to achieve by re-implementing that ? ########## airflow/api_fastapi/core_api/routes/ui/grid.py: ########## @@ -0,0 +1,229 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import collections +import itertools +from typing import Annotated + +from fastapi import Depends, HTTPException, Request, status +from sqlalchemy import select + +from airflow import DAG +from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.parameters import ( + OptionalDateTimeQuery, + QueryDagRunRunTypesFilter, + QueryDagRunStateFilter, + QueryIncludeDownstream, + QueryIncludeUpstream, + QueryLimit, + QueryOffset, + Range, + RangeFilter, + SortParam, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.ui.grid import ( + GridDAGRunwithTIs, + GridResponse, +) +from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.services.ui.grid import ( + fill_task_instance_summaries, + get_child_task_map, + get_dag_run_sort_param, + get_task_group_map, +) +from airflow.models import DagRun, TaskInstance +from airflow.models.dagrun import DagRunNote +from airflow.models.taskinstance import TaskInstanceNote + +grid_router = AirflowRouter(prefix="/grid", tags=["Grid"]) + + +@grid_router.get( + "/{dag_id}", + include_in_schema=False, + responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), +) +def grid_data( + dag_id: str, + run_types: QueryDagRunRunTypesFilter, + run_states: QueryDagRunStateFilter, + session: SessionDep, + offset: QueryOffset, + request: Request, + limit: QueryLimit, Review Comment: I think we should default to `default_dag_run_display_number` at some point for the limit ? (num_runs) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
