This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b5712e7c17 AIP-84 Migrate the public endpoint DAG Details to FastAPI
(#42631)
b5712e7c17 is described below
commit b5712e7c1710fadb2ab7425c20e7982ce03e5d2e
Author: Omkar P <[email protected]>
AuthorDate: Thu Oct 3 14:08:01 2024 +0530
AIP-84 Migrate the public endpoint DAG Details to FastAPI (#42631)
* Migrate the public endpoint DAG Details to FastAPI
* Add comment for computed_field, remove unused import
* Update pendulum import path
* Re-run breeze static checks
* Add openapi-gen for DAG Details API
* Resolve review comments, test has_task_concurrency_limits
* Remove unused import
* Use lambda for aliases, re-run breeze static checks
* Remove duplicate entry
* Use simpler approach for alias
---
airflow/api_connexion/endpoints/dag_endpoint.py | 1 +
airflow/api_fastapi/openapi/v1-generated.yaml | 288 +++++++++++++++++
airflow/api_fastapi/serializers/dags.py | 61 +++-
airflow/api_fastapi/views/public/dags.py | 33 +-
airflow/ui/openapi-gen/queries/common.ts | 16 +
airflow/ui/openapi-gen/queries/prefetch.ts | 20 ++
airflow/ui/openapi-gen/queries/queries.ts | 26 ++
airflow/ui/openapi-gen/queries/suspense.ts | 26 ++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 404 ++++++++++++++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 29 ++
airflow/ui/openapi-gen/requests/types.gen.ts | 93 ++++++
tests/api_fastapi/views/public/test_dags.py | 89 +++++-
12 files changed, 1075 insertions(+), 11 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py
b/airflow/api_connexion/endpoints/dag_endpoint.py
index 5d10a97ded..3d0d3dd8bf 100644
--- a/airflow/api_connexion/endpoints/dag_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_endpoint.py
@@ -70,6 +70,7 @@ def get_dag(
)
+@mark_fastapi_migration_done
@security.requires_access_dag("GET")
@provide_session
def get_dag_details(
diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml
b/airflow/api_fastapi/openapi/v1-generated.yaml
index a54e0e4ca5..ce488a996a 100644
--- a/airflow/api_fastapi/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/openapi/v1-generated.yaml
@@ -252,6 +252,57 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/dags/{dag_id}/details:
+ get:
+ tags:
+ - DAG
+ summary: Get Dag Details
+ description: Get details of DAG.
+ operationId: get_dag_details
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DAGDetailsResponse'
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unprocessable Entity
/public/dags/{dag_id}:
patch:
tags:
@@ -378,6 +429,243 @@ components:
- total_entries
title: DAGCollectionResponse
description: DAG Collection serializer for responses.
+ DAGDetailsResponse:
+ properties:
+ dag_id:
+ type: string
+ title: Dag Id
+ dag_display_name:
+ type: string
+ title: Dag Display Name
+ is_paused:
+ type: boolean
+ title: Is Paused
+ is_active:
+ type: boolean
+ title: Is Active
+ last_parsed_time:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Last Parsed Time
+ last_pickled:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Last Pickled
+ last_expired:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Last Expired
+ scheduler_lock:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Scheduler Lock
+ pickle_id:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Pickle Id
+ default_view:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Default View
+ fileloc:
+ type: string
+ title: Fileloc
+ description:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Description
+ timetable_summary:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Timetable Summary
+ timetable_description:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Timetable Description
+ tags:
+ items:
+ $ref: '#/components/schemas/DagTagPydantic'
+ type: array
+ title: Tags
+ max_active_tasks:
+ type: integer
+ title: Max Active Tasks
+ max_active_runs:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Max Active Runs
+ max_consecutive_failed_dag_runs:
+ type: integer
+ title: Max Consecutive Failed Dag Runs
+ has_task_concurrency_limits:
+ type: boolean
+ title: Has Task Concurrency Limits
+ has_import_errors:
+ type: boolean
+ title: Has Import Errors
+ next_dagrun:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Next Dagrun
+ next_dagrun_data_interval_start:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Next Dagrun Data Interval Start
+ next_dagrun_data_interval_end:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Next Dagrun Data Interval End
+ next_dagrun_create_after:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Next Dagrun Create After
+ owners:
+ items:
+ type: string
+ type: array
+ title: Owners
+ catchup:
+ type: boolean
+ title: Catchup
+ dag_run_timeout:
+ anyOf:
+ - type: string
+ format: duration
+ - type: 'null'
+ title: Dag Run Timeout
+ dataset_expression:
+ anyOf:
+ - type: object
+ - type: 'null'
+ title: Dataset Expression
+ doc_md:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Doc Md
+ start_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Start Date
+ end_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: End Date
+ is_paused_upon_creation:
+ anyOf:
+ - type: boolean
+ - type: 'null'
+ title: Is Paused Upon Creation
+ orientation:
+ type: string
+ title: Orientation
+ params:
+ anyOf:
+ - type: object
+ - type: 'null'
+ title: Params
+ render_template_as_native_obj:
+ type: boolean
+ title: Render Template As Native Obj
+ template_search_path:
+ anyOf:
+ - items:
+ type: string
+ type: array
+ - type: 'null'
+ title: Template Search Path
+ timezone:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Timezone
+ last_parsed:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Last Parsed
+ file_token:
+ type: string
+ title: File Token
+ description: Return file token.
+ readOnly: true
+ concurrency:
+ type: integer
+ title: Concurrency
+ description: Return max_active_tasks as concurrency.
+ readOnly: true
+ type: object
+ required:
+ - dag_id
+ - dag_display_name
+ - is_paused
+ - is_active
+ - last_parsed_time
+ - last_pickled
+ - last_expired
+ - scheduler_lock
+ - pickle_id
+ - default_view
+ - fileloc
+ - description
+ - timetable_summary
+ - timetable_description
+ - tags
+ - max_active_tasks
+ - max_active_runs
+ - max_consecutive_failed_dag_runs
+ - has_task_concurrency_limits
+ - has_import_errors
+ - next_dagrun
+ - next_dagrun_data_interval_start
+ - next_dagrun_data_interval_end
+ - next_dagrun_create_after
+ - owners
+ - catchup
+ - dag_run_timeout
+ - dataset_expression
+ - doc_md
+ - start_date
+ - end_date
+ - is_paused_upon_creation
+ - orientation
+ - params
+ - render_template_as_native_obj
+ - template_search_path
+ - timezone
+ - last_parsed
+ - file_token
+ - concurrency
+ title: DAGDetailsResponse
+ description: Specific serializer for DAG Details responses.
DAGPatchBody:
properties:
is_paused:
diff --git a/airflow/api_fastapi/serializers/dags.py
b/airflow/api_fastapi/serializers/dags.py
index 59b47bdef9..17677054c4 100644
--- a/airflow/api_fastapi/serializers/dags.py
+++ b/airflow/api_fastapi/serializers/dags.py
@@ -17,12 +17,16 @@
from __future__ import annotations
-from datetime import datetime
-from typing import Any
+from collections import abc
+from datetime import datetime, timedelta
+from typing import Any, Iterable
from itsdangerous import URLSafeSerializer
+from pendulum.tz.timezone import FixedTimezone, Timezone
from pydantic import (
+ AliasChoices,
BaseModel,
+ Field,
computed_field,
field_validator,
)
@@ -93,3 +97,56 @@ class DAGCollectionResponse(BaseModel):
dags: list[DAGResponse]
total_entries: int
+
+
+class DAGDetailsResponse(DAGResponse):
+ """Specific serializer for DAG Details responses."""
+
+ catchup: bool
+ dag_run_timeout: timedelta | None = Field(
+ validation_alias=AliasChoices("dag_run_timeout", "dagrun_timeout")
+ )
+ dataset_expression: dict | None
+ doc_md: str | None
+ start_date: datetime | None
+ end_date: datetime | None
+ is_paused_upon_creation: bool | None
+ orientation: str
+ params: abc.MutableMapping | None
+ render_template_as_native_obj: bool
+ template_search_path: Iterable[str] | None = Field(
+ validation_alias=AliasChoices("template_search_path",
"template_searchpath")
+ )
+ timezone: str | None
+ last_parsed: datetime | None =
Field(validation_alias=AliasChoices("last_parsed", "last_loaded"))
+
+ @field_validator("timezone", mode="before")
+ @classmethod
+ def get_timezone(cls, tz: Timezone | FixedTimezone) -> str | None:
+ """Convert timezone attribute to string representation."""
+ if tz is None:
+ return None
+ return str(tz)
+
+ @field_validator("timetable_summary", mode="before")
+ @classmethod
+ def get_timetable_summary(cls, tts: str | None) -> str | None:
+ """Validate the string representation of timetable_summary."""
+ if tts is None or tts == "None":
+ return None
+ return str(tts)
+
+ @field_validator("params", mode="before")
+ @classmethod
+ def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
+ """Convert params attribute to dict representation."""
+ if params is None:
+ return None
+ return {param_name: param_val.dump() for param_name, param_val in
params.items()}
+
+ # Mypy issue https://github.com/python/mypy/issues/1362
+ @computed_field # type: ignore[misc]
+ @property
+ def concurrency(self) -> int:
+ """Return max_active_tasks as concurrency."""
+ return self.max_active_tasks
diff --git a/airflow/api_fastapi/views/public/dags.py
b/airflow/api_fastapi/views/public/dags.py
index 3761d593d2..ef76e18450 100644
--- a/airflow/api_fastapi/views/public/dags.py
+++ b/airflow/api_fastapi/views/public/dags.py
@@ -17,7 +17,7 @@
from __future__ import annotations
-from fastapi import Depends, HTTPException, Query
+from fastapi import Depends, HTTPException, Query, Request
from sqlalchemy import update
from sqlalchemy.orm import Session
from typing_extensions import Annotated
@@ -41,9 +41,14 @@ from airflow.api_fastapi.parameters import (
QueryTagsFilter,
SortParam,
)
-from airflow.api_fastapi.serializers.dags import DAGCollectionResponse,
DAGPatchBody, DAGResponse
+from airflow.api_fastapi.serializers.dags import (
+ DAGCollectionResponse,
+ DAGDetailsResponse,
+ DAGPatchBody,
+ DAGResponse,
+)
from airflow.api_fastapi.views.router import AirflowRouter
-from airflow.models import DagModel
+from airflow.models import DAG, DagModel
dags_router = AirflowRouter(tags=["DAG"])
@@ -87,6 +92,28 @@ async def get_dags(
)
+@dags_router.get(
+ "/dags/{dag_id}/details",
responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422])
+)
+async def get_dag_details(
+ dag_id: str, session: Annotated[Session, Depends(get_session)], request:
Request
+) -> DAGDetailsResponse:
+ """Get details of DAG."""
+ dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+ if not dag:
+ raise HTTPException(404, f"Dag with id {dag_id} was not found")
+
+ dag_model: DagModel = session.get(DagModel, dag_id)
+ if not dag_model:
+ raise HTTPException(404, f"Unable to obtain dag with id {dag_id} from
session")
+
+ for key, value in dag.__dict__.items():
+ if not key.startswith("_") and not hasattr(dag_model, key):
+ setattr(dag_model, key, value)
+
+ return DAGDetailsResponse.model_validate(dag_model, from_attributes=True)
+
+
@dags_router.patch("/dags/{dag_id}",
responses=create_openapi_http_exception_doc([400, 401, 403, 404]))
async def patch_dag(
dag_id: str,
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index fcddded7dc..f5bc875a56 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -74,6 +74,22 @@ export const UseDagServiceGetDagsKeyFn = (
},
]),
];
+export type DagServiceGetDagDetailsDefaultResponse = Awaited<
+ ReturnType<typeof DagService.getDagDetails>
+>;
+export type DagServiceGetDagDetailsQueryResult<
+ TData = DagServiceGetDagDetailsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDagServiceGetDagDetailsKey = "DagServiceGetDagDetails";
+export const UseDagServiceGetDagDetailsKeyFn = (
+ {
+ dagId,
+ }: {
+ dagId: string;
+ },
+ queryKey?: Array<unknown>,
+) => [useDagServiceGetDagDetailsKey, ...(queryKey ?? [{ dagId }])];
export type DagServicePatchDagsMutationResult = Awaited<
ReturnType<typeof DagService.patchDags>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 95c2c7b737..210c4f7f77 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -94,3 +94,23 @@ export const prefetchUseDagServiceGetDags = (
tags,
}),
});
+/**
+ * Get Dag Details
+ * Get details of DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns DAGDetailsResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDagServiceGetDagDetails = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ }: {
+ dagId: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }),
+ queryFn: () => DagService.getDagDetails({ dagId }),
+ });
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index f83c151b91..f12a6504c8 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -118,6 +118,32 @@ export const useDagServiceGetDags = <
}) as TData,
...options,
});
+/**
+ * Get Dag Details
+ * Get details of DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns DAGDetailsResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagServiceGetDagDetails = <
+ TData = Common.DagServiceGetDagDetailsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ }: {
+ dagId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }, queryKey),
+ queryFn: () => DagService.getDagDetails({ dagId }) as TData,
+ ...options,
+ });
/**
* Patch Dags
* Patch multiple DAGs.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index dc8b99dfb2..d9d62d35e3 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -109,3 +109,29 @@ export const useDagServiceGetDagsSuspense = <
}) as TData,
...options,
});
+/**
+ * Get Dag Details
+ * Get details of DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns DAGDetailsResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagServiceGetDagDetailsSuspense = <
+ TData = Common.DagServiceGetDagDetailsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ }: {
+ dagId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseDagServiceGetDagDetailsKeyFn({ dagId }, queryKey),
+ queryFn: () => DagService.getDagDetails({ dagId }) as TData,
+ ...options,
+ });
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e8c9b5d70c..e8aae616be 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -20,6 +20,410 @@ export const $DAGCollectionResponse = {
description: "DAG Collection serializer for responses.",
} as const;
+export const $DAGDetailsResponse = {
+ properties: {
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
+ },
+ dag_display_name: {
+ type: "string",
+ title: "Dag Display Name",
+ },
+ is_paused: {
+ type: "boolean",
+ title: "Is Paused",
+ },
+ is_active: {
+ type: "boolean",
+ title: "Is Active",
+ },
+ last_parsed_time: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Last Parsed Time",
+ },
+ last_pickled: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Last Pickled",
+ },
+ last_expired: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Last Expired",
+ },
+ scheduler_lock: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Scheduler Lock",
+ },
+ pickle_id: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Pickle Id",
+ },
+ default_view: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Default View",
+ },
+ fileloc: {
+ type: "string",
+ title: "Fileloc",
+ },
+ description: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Description",
+ },
+ timetable_summary: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Timetable Summary",
+ },
+ timetable_description: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Timetable Description",
+ },
+ tags: {
+ items: {
+ $ref: "#/components/schemas/DagTagPydantic",
+ },
+ type: "array",
+ title: "Tags",
+ },
+ max_active_tasks: {
+ type: "integer",
+ title: "Max Active Tasks",
+ },
+ max_active_runs: {
+ anyOf: [
+ {
+ type: "integer",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Max Active Runs",
+ },
+ max_consecutive_failed_dag_runs: {
+ type: "integer",
+ title: "Max Consecutive Failed Dag Runs",
+ },
+ has_task_concurrency_limits: {
+ type: "boolean",
+ title: "Has Task Concurrency Limits",
+ },
+ has_import_errors: {
+ type: "boolean",
+ title: "Has Import Errors",
+ },
+ next_dagrun: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Next Dagrun",
+ },
+ next_dagrun_data_interval_start: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Next Dagrun Data Interval Start",
+ },
+ next_dagrun_data_interval_end: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Next Dagrun Data Interval End",
+ },
+ next_dagrun_create_after: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Next Dagrun Create After",
+ },
+ owners: {
+ items: {
+ type: "string",
+ },
+ type: "array",
+ title: "Owners",
+ },
+ catchup: {
+ type: "boolean",
+ title: "Catchup",
+ },
+ dag_run_timeout: {
+ anyOf: [
+ {
+ type: "string",
+ format: "duration",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Dag Run Timeout",
+ },
+ dataset_expression: {
+ anyOf: [
+ {
+ type: "object",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Dataset Expression",
+ },
+ doc_md: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Doc Md",
+ },
+ start_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Start Date",
+ },
+ end_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "End Date",
+ },
+ is_paused_upon_creation: {
+ anyOf: [
+ {
+ type: "boolean",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Is Paused Upon Creation",
+ },
+ orientation: {
+ type: "string",
+ title: "Orientation",
+ },
+ params: {
+ anyOf: [
+ {
+ type: "object",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Params",
+ },
+ render_template_as_native_obj: {
+ type: "boolean",
+ title: "Render Template As Native Obj",
+ },
+ template_search_path: {
+ anyOf: [
+ {
+ items: {
+ type: "string",
+ },
+ type: "array",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Template Search Path",
+ },
+ timezone: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Timezone",
+ },
+ last_parsed: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Last Parsed",
+ },
+ file_token: {
+ type: "string",
+ title: "File Token",
+ description: "Return file token.",
+ readOnly: true,
+ },
+ concurrency: {
+ type: "integer",
+ title: "Concurrency",
+ description: "Return max_active_tasks as concurrency.",
+ readOnly: true,
+ },
+ },
+ type: "object",
+ required: [
+ "dag_id",
+ "dag_display_name",
+ "is_paused",
+ "is_active",
+ "last_parsed_time",
+ "last_pickled",
+ "last_expired",
+ "scheduler_lock",
+ "pickle_id",
+ "default_view",
+ "fileloc",
+ "description",
+ "timetable_summary",
+ "timetable_description",
+ "tags",
+ "max_active_tasks",
+ "max_active_runs",
+ "max_consecutive_failed_dag_runs",
+ "has_task_concurrency_limits",
+ "has_import_errors",
+ "next_dagrun",
+ "next_dagrun_data_interval_start",
+ "next_dagrun_data_interval_end",
+ "next_dagrun_create_after",
+ "owners",
+ "catchup",
+ "dag_run_timeout",
+ "dataset_expression",
+ "doc_md",
+ "start_date",
+ "end_date",
+ "is_paused_upon_creation",
+ "orientation",
+ "params",
+ "render_template_as_native_obj",
+ "template_search_path",
+ "timezone",
+ "last_parsed",
+ "file_token",
+ "concurrency",
+ ],
+ title: "DAGDetailsResponse",
+ description: "Specific serializer for DAG Details responses.",
+} as const;
+
export const $DAGPatchBody = {
properties: {
is_paused: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 24c960d2b7..0aefb56d06 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -9,6 +9,8 @@ import type {
GetDagsResponse,
PatchDagsData,
PatchDagsResponse,
+ GetDagDetailsData,
+ GetDagDetailsResponse,
PatchDagData,
PatchDagResponse,
DeleteConnectionData,
@@ -127,6 +129,33 @@ export class DagService {
});
}
+ /**
+ * Get Dag Details
+ * Get details of DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @returns DAGDetailsResponse Successful Response
+ * @throws ApiError
+ */
+ public static getDagDetails(
+ data: GetDagDetailsData,
+ ): CancelablePromise<GetDagDetailsResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/dags/{dag_id}/details",
+ path: {
+ dag_id: data.dagId,
+ },
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Unprocessable Entity",
+ },
+ });
+ }
+
/**
* Patch Dag
* Patch the specific DAG.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index b38d5c00a6..c37106abc8 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -8,6 +8,62 @@ export type DAGCollectionResponse = {
total_entries: number;
};
+/**
+ * Specific serializer for DAG Details responses.
+ */
+export type DAGDetailsResponse = {
+ dag_id: string;
+ dag_display_name: string;
+ is_paused: boolean;
+ is_active: boolean;
+ last_parsed_time: string | null;
+ last_pickled: string | null;
+ last_expired: string | null;
+ scheduler_lock: string | null;
+ pickle_id: string | null;
+ default_view: string | null;
+ fileloc: string;
+ description: string | null;
+ timetable_summary: string | null;
+ timetable_description: string | null;
+ tags: Array<DagTagPydantic>;
+ max_active_tasks: number;
+ max_active_runs: number | null;
+ max_consecutive_failed_dag_runs: number;
+ has_task_concurrency_limits: boolean;
+ has_import_errors: boolean;
+ next_dagrun: string | null;
+ next_dagrun_data_interval_start: string | null;
+ next_dagrun_data_interval_end: string | null;
+ next_dagrun_create_after: string | null;
+ owners: Array<string>;
+ catchup: boolean;
+ dag_run_timeout: string | null;
+ dataset_expression: {
+ [key: string]: unknown;
+ } | null;
+ doc_md: string | null;
+ start_date: string | null;
+ end_date: string | null;
+ is_paused_upon_creation: boolean | null;
+ orientation: string;
+ params: {
+ [key: string]: unknown;
+ } | null;
+ render_template_as_native_obj: boolean;
+ template_search_path: Array<string> | null;
+ timezone: string | null;
+ last_parsed: string | null;
+ /**
+ * Return file token.
+ */
+ readonly file_token: string;
+ /**
+ * Return max_active_tasks as concurrency.
+ */
+ readonly concurrency: number;
+};
+
/**
* Dag Serializer for updatable body.
*/
@@ -126,6 +182,12 @@ export type PatchDagsData = {
export type PatchDagsResponse = DAGCollectionResponse;
+export type GetDagDetailsData = {
+ dagId: string;
+};
+
+export type GetDagDetailsResponse = DAGDetailsResponse;
+
export type PatchDagData = {
dagId: string;
requestBody: DAGPatchBody;
@@ -202,6 +264,37 @@ export type $OpenApiTs = {
};
};
};
+ "/public/dags/{dag_id}/details": {
+ get: {
+ req: GetDagDetailsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: DAGDetailsResponse;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Unprocessable Entity
+ */
+ 422: HTTPExceptionResponse;
+ };
+ };
+ };
"/public/dags/{dag_id}": {
patch: {
req: PatchDagData;
diff --git a/tests/api_fastapi/views/public/test_dags.py
b/tests/api_fastapi/views/public/test_dags.py
index 7b68ebe512..58b3daf35c 100644
--- a/tests/api_fastapi/views/public/test_dags.py
+++ b/tests/api_fastapi/views/public/test_dags.py
@@ -18,6 +18,7 @@ from __future__ import annotations
from datetime import datetime, timezone
+import pendulum
import pytest
from airflow.models.dag import DagModel
@@ -34,8 +35,10 @@ DAG1_ID = "test_dag1"
DAG1_DISPLAY_NAME = "display1"
DAG2_ID = "test_dag2"
DAG2_DISPLAY_NAME = "display2"
+DAG2_START_DATE = datetime(2021, 6, 15, tzinfo=timezone.utc)
DAG3_ID = "test_dag3"
TASK_ID = "op1"
+UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else
"Timezone('UTC')"
@provide_session
@@ -74,7 +77,8 @@ def _create_deactivated_paused_dag(session=None):
@pytest.fixture(autouse=True)
-def setup(dag_maker) -> None:
+@provide_session
+def setup(dag_maker, session=None) -> None:
clear_db_runs()
clear_db_dags()
clear_db_serialized_dags()
@@ -96,15 +100,18 @@ def setup(dag_maker) -> None:
DAG2_ID,
dag_display_name=DAG2_DISPLAY_NAME,
schedule=None,
- start_date=datetime(
- 2021,
- 6,
- 15,
- ),
+ start_date=DAG2_START_DATE,
+ doc_md="details",
+ params={"foo": 1},
+ max_active_tasks=16,
+ max_active_runs=16,
):
EmptyOperator(task_id=TASK_ID)
dag_maker.dagbag.sync_to_db()
+ dag_maker.dag_model.has_task_concurrency_limits = True
+ session.merge(dag_maker.dag_model)
+ session.commit()
_create_deactivated_paused_dag()
@@ -225,3 +232,73 @@ def test_patch_dags(test_client, query_params, body,
expected_status_code, expec
assert [dag["dag_id"] for dag in body["dags"]] == expected_ids
paused_dag_ids = [dag["dag_id"] for dag in body["dags"] if
dag["is_paused"]]
assert paused_dag_ids == expected_paused_ids
+
+
[email protected](
+ "query_params, dag_id, expected_status_code, dag_display_name, start_date",
+ [
+ ({}, "fake_dag_id", 404, "fake_dag", datetime(2023, 12, 31,
tzinfo=timezone.utc)),
+ ({}, DAG2_ID, 200, DAG2_DISPLAY_NAME, DAG2_START_DATE),
+ ],
+)
+def test_dag_details(test_client, query_params, dag_id, expected_status_code,
dag_display_name, start_date):
+ response = test_client.get(f"/public/dags/{dag_id}/details",
params=query_params)
+ assert response.status_code == expected_status_code
+ if expected_status_code != 200:
+ return
+
+ # Match expected and actual responses below.
+ res_json = response.json()
+ last_parsed = res_json["last_parsed"]
+ last_parsed_time = res_json["last_parsed_time"]
+ file_token = res_json["file_token"]
+ expected = {
+ "catchup": True,
+ "concurrency": 16,
+ "dag_id": dag_id,
+ "dag_display_name": dag_display_name,
+ "dag_run_timeout": None,
+ "dataset_expression": None,
+ "default_view": "grid",
+ "description": None,
+ "doc_md": "details",
+ "end_date": None,
+ "fileloc": "/opt/airflow/tests/api_fastapi/views/public/test_dags.py",
+ "file_token": file_token,
+ "has_import_errors": False,
+ "has_task_concurrency_limits": True,
+ "is_active": True,
+ "is_paused": False,
+ "is_paused_upon_creation": None,
+ "last_expired": None,
+ "last_parsed": last_parsed,
+ "last_parsed_time": last_parsed_time,
+ "last_pickled": None,
+ "max_active_runs": 16,
+ "max_active_tasks": 16,
+ "max_consecutive_failed_dag_runs": 0,
+ "next_dagrun": None,
+ "next_dagrun_create_after": None,
+ "next_dagrun_data_interval_end": None,
+ "next_dagrun_data_interval_start": None,
+ "orientation": "LR",
+ "owners": ["airflow"],
+ "params": {
+ "foo": {
+ "__class": "airflow.models.param.Param",
+ "description": None,
+ "schema": {},
+ "value": 1,
+ }
+ },
+ "pickle_id": None,
+ "render_template_as_native_obj": False,
+ "timetable_summary": None,
+ "scheduler_lock": None,
+ "start_date": start_date.replace(tzinfo=None).isoformat() + "Z", #
pydantic datetime format
+ "tags": [],
+ "template_search_path": None,
+ "timetable_description": "Never, external triggers only",
+ "timezone": UTC_JSON_REPR,
+ }
+ assert res_json == expected