This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new fceff1697ec Remove filtering by last dag run state in patch dags
endpoint (#51176) (#51347)
fceff1697ec is described below
commit fceff1697ec939c1145b32b24343e72e52fa7387
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Jun 3 11:44:35 2025 +0200
Remove filtering by last dag run state in patch dags endpoint (#51176)
(#51347)
I don't think this is used. The query is complicated so let's just remove
it.
Meanwhile, it's a bit weird to have an endpoint with pagination that
filters over a mutable characteristic -- such as last run state.
(cherry picked from commit 42003357cc31ee6abda6233446d4e9d5ed78dfb2)
Co-authored-by: Daniel Standish
<[email protected]>
---
.../src/airflow/api_fastapi/common/db/dags.py | 92 ----------------------
.../core_api/openapi/v1-rest-api-generated.yaml | 8 --
.../api_fastapi/core_api/routes/public/dags.py | 8 +-
.../src/airflow/ui/openapi-gen/queries/queries.ts | 5 --
.../ui/openapi-gen/requests/services.gen.ts | 2 -
.../airflow/ui/openapi-gen/requests/types.gen.ts | 1 -
.../core_api/routes/public/test_dags.py | 8 +-
7 files changed, 5 insertions(+), 119 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dags.py
b/airflow-core/src/airflow/api_fastapi/common/db/dags.py
deleted file mode 100644
index d9d2b66e6f7..00000000000
--- a/airflow-core/src/airflow/api_fastapi/common/db/dags.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from sqlalchemy import func, select
-
-if TYPE_CHECKING:
- from sqlalchemy.sql import Select
-
-from airflow.models.dag import DagModel
-from airflow.models.dagrun import DagRun
-
-
-def generate_dag_with_latest_run_query(dag_runs_cte: Select | None = None) ->
Select:
- latest_dag_run_per_dag_id_cte = (
- select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
- .where()
- .group_by(DagRun.dag_id)
- .cte()
- )
-
- dags_select_with_latest_dag_run = (
- select(DagModel)
- .join(
- latest_dag_run_per_dag_id_cte,
- DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
- isouter=True,
- )
- .join(
- DagRun,
- DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
- and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
- isouter=True,
- )
- .order_by(DagModel.dag_id)
- )
-
- if dag_runs_cte is None:
- return dags_select_with_latest_dag_run
-
- dag_run_filters_cte = (
- select(DagModel.dag_id)
- .join(
- dag_runs_cte,
- DagModel.dag_id == dag_runs_cte.c.dag_id,
- )
- .join(
- DagRun,
- DagRun.dag_id == dag_runs_cte.c.dag_id,
- )
- .group_by(DagModel.dag_id)
- .cte()
- )
-
- dags_with_latest_and_filtered_runs = (
- select(DagModel)
- .join(
- dag_run_filters_cte,
- dag_run_filters_cte.c.dag_id == DagModel.dag_id,
- )
- .join(
- latest_dag_run_per_dag_id_cte,
- DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
- isouter=True,
- )
- .join(
- DagRun,
- DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
- and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
- isouter=True,
- )
- .order_by(DagModel.dag_id)
- )
-
- return dags_with_latest_and_filtered_runs
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml
index 53279035bc7..ff6725b266b 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml
@@ -2844,14 +2844,6 @@ paths:
- type: boolean
- type: 'null'
title: Paused
- - name: last_dag_run_state
- in: query
- required: false
- schema:
- anyOf:
- - $ref: '#/components/schemas/DagRunState'
- - type: 'null'
- title: Last Dag Run State
requestBody:
required: true
content:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
index f55c5763701..c078f2baf43 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
@@ -31,7 +31,6 @@ from airflow.api_fastapi.common.db.common import (
apply_filters_to_select,
paginated_select,
)
-from airflow.api_fastapi.common.db.dags import
generate_dag_with_latest_run_query
from airflow.api_fastapi.common.parameters import (
FilterOptionEnum,
FilterParam,
@@ -301,7 +300,6 @@ def patch_dags(
dag_id_pattern: QueryDagIdPatternSearchWithNone,
exclude_stale: QueryExcludeStaleFilter,
paused: QueryPausedFilter,
- last_dag_run_state: QueryLastDagRunStateFilter,
editable_dags_filter: EditableDagsFilterDep,
session: SessionDep,
update_mask: list[str] | None = Query(None),
@@ -318,18 +316,14 @@ def patch_dags(
except ValidationError as e:
raise RequestValidationError(errors=e.errors())
- # todo: this is not used?
- update_mask = ["is_paused"]
-
dags_select, total_entries = paginated_select(
- statement=generate_dag_with_latest_run_query(),
+ statement=select(DagModel),
filters=[
exclude_stale,
paused,
dag_id_pattern,
tags,
owners,
- last_dag_run_state,
editable_dags_filter,
],
order_by=None,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 1c532823f25..620dab69dc6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -3875,7 +3875,6 @@ export const useDagRunServicePatchDagRun = <
* @param data.dagIdPattern
* @param data.excludeStale
* @param data.paused
- * @param data.lastDagRunState
* @returns DAGCollectionResponse Successful Response
* @throws ApiError
*/
@@ -3891,7 +3890,6 @@ export const useDagServicePatchDags = <
{
dagIdPattern?: string;
excludeStale?: boolean;
- lastDagRunState?: DagRunState;
limit?: number;
offset?: number;
owners?: string[];
@@ -3912,7 +3910,6 @@ export const useDagServicePatchDags = <
{
dagIdPattern?: string;
excludeStale?: boolean;
- lastDagRunState?: DagRunState;
limit?: number;
offset?: number;
owners?: string[];
@@ -3927,7 +3924,6 @@ export const useDagServicePatchDags = <
mutationFn: ({
dagIdPattern,
excludeStale,
- lastDagRunState,
limit,
offset,
owners,
@@ -3940,7 +3936,6 @@ export const useDagServicePatchDags = <
DagService.patchDags({
dagIdPattern,
excludeStale,
- lastDagRunState,
limit,
offset,
owners,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 1c5847ae8d0..cb7cf86d49c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -1570,7 +1570,6 @@ export class DagService {
* @param data.dagIdPattern
* @param data.excludeStale
* @param data.paused
- * @param data.lastDagRunState
* @returns DAGCollectionResponse Successful Response
* @throws ApiError
*/
@@ -1588,7 +1587,6 @@ export class DagService {
dag_id_pattern: data.dagIdPattern,
exclude_stale: data.excludeStale,
paused: data.paused,
- last_dag_run_state: data.lastDagRunState,
},
body: data.requestBody,
mediaType: "application/json",
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 9e28ab66aac..18322378fd5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -2098,7 +2098,6 @@ export type GetDagsResponse = DAGCollectionResponse;
export type PatchDagsData = {
dagIdPattern?: string | null;
excludeStale?: boolean;
- lastDagRunState?: DagRunState | null;
limit?: number;
offset?: number;
owners?: Array<string>;
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
index 8e7ee00b835..f207de91d2e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
@@ -356,9 +356,9 @@ class TestPatchDags(TestDagEndpoint):
assert response.status_code == expected_status_code
if expected_status_code == 200:
body = response.json()
- assert [dag["dag_id"] for dag in body["dags"]] == expected_ids
- paused_dag_ids = [dag["dag_id"] for dag in body["dags"] if
dag["is_paused"]]
- assert paused_dag_ids == expected_paused_ids
+ assert {dag["dag_id"] for dag in body["dags"]} == set(expected_ids)
+ paused_dag_ids = {dag["dag_id"] for dag in body["dags"] if
dag["is_paused"]}
+ assert paused_dag_ids == set(expected_paused_ids)
check_last_log(session, dag_id=DAG1_ID, event="patch_dag",
logical_date=None)
@mock.patch("airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids")
@@ -371,7 +371,7 @@ class TestPatchDags(TestDagEndpoint):
assert response.status_code == 200
body = response.json()
- assert [dag["dag_id"] for dag in body["dags"]] == [DAG1_ID, DAG2_ID]
+ assert {dag["dag_id"] for dag in body["dags"]} == {DAG1_ID, DAG2_ID}
def test_patch_dags_should_response_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.patch("/dags",
json={"is_paused": True})