This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch execution-api-consolidate-2026-04-06 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bd772694188e3e3679aa38b8b0edc7222d943c68 Author: Software Developer <[email protected]> AuthorDate: Mon Mar 30 14:06:45 2026 +0200 Add log buffering to increase performance when streaming big logs (#63180) * *add buffering back to speed logs loading. * *fix scrolling for big logs. * *return back `# type: ignore[arg-type]`. apply PR remarks. * *add buffering back to speed logs loading. * *fix scrolling for big logs. * *return back `# type: ignore[arg-type]`. apply PR remarks. * *make buffer size configurable. * *add back some comments. --- .../api_fastapi/core_api/routes/public/log.py | 20 +++- .../api_fastapi/execution_api/versions/__init__.py | 19 ++-- .../execution_api/versions/v2025_11_07.py | 54 ----------- .../execution_api/versions/v2025_12_08.py | 41 -------- .../versions/{v2026_03_31.py => v2026_04_06.py} | 105 ++++++++++++++++----- .../execution_api/versions/v2026_04_13.py | 28 ------ .../src/airflow/config_templates/config.yml | 8 ++ .../src/pages/TaskInstance/Logs/TaskLogContent.tsx | 51 +++++++++- .../execution_api/versions/head/test_dag_runs.py | 2 +- .../execution_api/versions/v2026_03_31/__init__.py | 16 ---- .../{v2025_11_07 => v2026_04_06}/__init__.py | 0 .../{v2025_11_07 => v2026_04_06}/test_dag_runs.py | 3 +- .../{v2026_03_31 => v2026_04_06}/test_dags.py | 3 +- .../test_task_instances.py | 4 +- .../src/airflow/sdk/api/datamodels/_generated.py | 2 +- 15 files changed, 177 insertions(+), 179 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py index 20379876ebe..6b41aa3f5ac 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py @@ -19,6 +19,7 @@ from __future__ import annotations import contextlib import textwrap +from collections.abc import Generator, Iterable from fastapi import Depends, HTTPException, Request, status from fastapi.responses import StreamingResponse @@ -35,11 +36,14 @@ from airflow.api_fastapi.common.types import Mimetype from airflow.api_fastapi.core_api.datamodels.log import ExternalLogUrlResponse, TaskInstancesLogResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import DagAccessEntity, requires_access_dag +from airflow.configuration import conf from airflow.exceptions import TaskNotFound from airflow.models import TaskInstance, Trigger from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.utils.log.log_reader import TaskLogReader +_NDJSON_BATCH_SIZE = conf.getint("api", "log_stream_buffer_size") + task_instances_log_router = AirflowRouter( tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" ) @@ -59,6 +63,19 @@ ndjson_example_response_for_get_log = { } +def _buffered_ndjson_stream( + raw_stream: Iterable[str], +) -> Generator[str, None, None]: + buf: list[str] = [] + for line in raw_stream: + buf.append(line) + if len(buf) >= _NDJSON_BATCH_SIZE: + yield "".join(buf) + buf.clear() + if buf: + yield "".join(buf) + + @task_instances_log_router.get( "/{task_id}/logs/{try_number}", responses={ @@ -146,7 +163,8 @@ def get_log( if accept == Mimetype.NDJSON: # only specified application/x-ndjson will return streaming response # LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error - log_stream = task_log_reader.read_log_stream(ti, try_number, metadata) # type: ignore[arg-type] + raw_stream = task_log_reader.read_log_stream(ti, try_number, metadata) # type: ignore[arg-type] + log_stream = _buffered_ndjson_stream(raw_stream) headers = None if not metadata.get("end_of_log", False): headers = { diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 2cbe2e3007b..646831501a7 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -28,31 +28,30 @@ from airflow.api_fastapi.execution_api.versions.v2025_08_10 import ( from airflow.api_fastapi.execution_api.versions.v2025_09_23 import AddDagVersionIdField from airflow.api_fastapi.execution_api.versions.v2025_10_27 import MakeDagRunConfNullable from airflow.api_fastapi.execution_api.versions.v2025_11_05 import AddTriggeringUserNameField -from airflow.api_fastapi.execution_api.versions.v2025_11_07 import AddPartitionKeyField -from airflow.api_fastapi.execution_api.versions.v2025_12_08 import ( +from airflow.api_fastapi.execution_api.versions.v2026_04_06 import ( + AddDagEndpoint, AddDagRunDetailEndpoint, - MovePreviousRunEndpoint, -) -from airflow.api_fastapi.execution_api.versions.v2026_03_31 import ( AddNoteField, + AddPartitionKeyField, MakeDagRunStartDateNullable, ModifyDeferredTaskKwargsToJsonValue, + MovePreviousRunEndpoint, RemoveUpstreamMapIndexesField, ) -from airflow.api_fastapi.execution_api.versions.v2026_04_13 import AddDagEndpoint bundle = VersionBundle( HeadVersion(), - Version("2026-04-13", AddDagEndpoint), Version( - "2026-03-31", + "2026-04-06", + AddPartitionKeyField, + MovePreviousRunEndpoint, + AddDagRunDetailEndpoint, MakeDagRunStartDateNullable, ModifyDeferredTaskKwargsToJsonValue, RemoveUpstreamMapIndexesField, AddNoteField, + AddDagEndpoint, ), - Version("2025-12-08", MovePreviousRunEndpoint, AddDagRunDetailEndpoint), - Version("2025-11-07", AddPartitionKeyField), Version("2025-11-05", AddTriggeringUserNameField), Version("2025-10-27", MakeDagRunConfNullable), Version("2025-09-23", AddDagVersionIdField), diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py deleted file mode 100644 index 117ba492455..00000000000 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_11_07.py +++ /dev/null @@ -1,54 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - -from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema - -from airflow.api_fastapi.execution_api.datamodels.asset_event import ( - AssetEventResponse, - AssetEventsResponse, - DagRunAssetReference, -) -from airflow.api_fastapi.execution_api.datamodels.dagrun import TriggerDAGRunPayload -from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext - - -class AddPartitionKeyField(VersionChange): - """Add the `partition_key` field to DagRun model.""" - - description = __doc__ - - instructions_to_migrate_to_previous_version = ( - schema(DagRun).field("partition_key").didnt_exist, - schema(AssetEventResponse).field("partition_key").didnt_exist, - schema(TriggerDAGRunPayload).field("partition_key").didnt_exist, - schema(DagRunAssetReference).field("partition_key").didnt_exist, - ) - - @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] - def remove_partition_key_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc] - """Remove the `partition_key` field from the dag_run object when converting to the previous version.""" - if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): - response.body["dag_run"].pop("partition_key", None) - - @convert_response_to_previous_version_for(AssetEventsResponse) # type: ignore[arg-type] - def remove_partition_key_from_asset_events(response: ResponseInfo) -> None: # type: ignore[misc] - """Remove the `partition_key` field from the dag_run object when converting to the previous version.""" - events = response.body["asset_events"] - for elem in events: - elem.pop("partition_key", None) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_12_08.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_12_08.py deleted file mode 100644 index 3394036bf15..00000000000 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_12_08.py +++ /dev/null @@ -1,41 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - -from cadwyn import VersionChange, endpoint - - -class MovePreviousRunEndpoint(VersionChange): - """Add new previous-run endpoint and migrate old endpoint.""" - - description = __doc__ - - instructions_to_migrate_to_previous_version = ( - endpoint("/dag-runs/previous", ["GET"]).didnt_exist, - endpoint("/dag-runs/{dag_id}/previous", ["GET"]).existed, - ) - - -class AddDagRunDetailEndpoint(VersionChange): - """Add dag run detail endpoint.""" - - description = __doc__ - - instructions_to_migrate_to_previous_version = ( - endpoint("/dag-runs/{dag_id}/{run_id}", ["GET"]).didnt_exist, - ) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py similarity index 62% rename from airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py rename to airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py index 2d14493e81f..85ec1f2a608 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py @@ -19,9 +19,15 @@ from __future__ import annotations from typing import Any -from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema +from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, endpoint, schema from airflow.api_fastapi.common.types import UtcDateTime +from airflow.api_fastapi.execution_api.datamodels.asset_event import ( + AssetEventResponse, + AssetEventsResponse, + DagRunAssetReference, +) +from airflow.api_fastapi.execution_api.datamodels.dagrun import TriggerDAGRunPayload from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( DagRun, TIDeferredStatePayload, @@ -29,6 +35,79 @@ from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( ) +class AddPartitionKeyField(VersionChange): + """Add the `partition_key` field to DagRun model.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + schema(DagRun).field("partition_key").didnt_exist, + schema(AssetEventResponse).field("partition_key").didnt_exist, + schema(TriggerDAGRunPayload).field("partition_key").didnt_exist, + schema(DagRunAssetReference).field("partition_key").didnt_exist, + ) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def remove_partition_key_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc] + """Remove the `partition_key` field from the dag_run object when converting to the previous version.""" + if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): + response.body["dag_run"].pop("partition_key", None) + + @convert_response_to_previous_version_for(AssetEventsResponse) # type: ignore[arg-type] + def remove_partition_key_from_asset_events(response: ResponseInfo) -> None: # type: ignore[misc] + """Remove the `partition_key` field from the dag_run object when converting to the previous version.""" + events = response.body["asset_events"] + for elem in events: + elem.pop("partition_key", None) + + +class MovePreviousRunEndpoint(VersionChange): + """Add new previous-run endpoint and migrate old endpoint.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + endpoint("/dag-runs/previous", ["GET"]).didnt_exist, + endpoint("/dag-runs/{dag_id}/previous", ["GET"]).existed, + ) + + +class AddDagRunDetailEndpoint(VersionChange): + """Add dag run detail endpoint.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + endpoint("/dag-runs/{dag_id}/{run_id}", ["GET"]).didnt_exist, + ) + + +class MakeDagRunStartDateNullable(VersionChange): + """Make DagRun.start_date field nullable for runs that haven't started yet.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = (schema(DagRun).field("start_date").had(type=UtcDateTime),) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def ensure_start_date_in_ti_run_context(response: ResponseInfo) -> None: # type: ignore[misc] + """ + Ensure start_date is never None in DagRun for previous API versions. + + Older Task SDK clients expect start_date to be non-nullable. When the + DagRun hasn't started yet (e.g. queued), fall back to run_after. + """ + dag_run = response.body.get("dag_run") + if isinstance(dag_run, dict) and dag_run.get("start_date") is None: + dag_run["start_date"] = dag_run.get("run_after") + + @convert_response_to_previous_version_for(DagRun) # type: ignore[arg-type] + def ensure_start_date_in_dag_run(response: ResponseInfo) -> None: # type: ignore[misc] + """Ensure start_date is never None in direct DagRun responses for previous API versions.""" + if response.body.get("start_date") is None: + response.body["start_date"] = response.body.get("run_after") + + class ModifyDeferredTaskKwargsToJsonValue(VersionChange): """Change the types of `trigger_kwargs` and `next_kwargs` in TIDeferredStatePayload to JsonValue.""" @@ -71,27 +150,9 @@ class AddNoteField(VersionChange): response.body["dag_run"].pop("note", None) -class MakeDagRunStartDateNullable(VersionChange): - """Make DagRun.start_date field nullable for runs that haven't started yet.""" +class AddDagEndpoint(VersionChange): + """Add the `/dags/{dag_id}` endpoint.""" description = __doc__ - instructions_to_migrate_to_previous_version = (schema(DagRun).field("start_date").had(type=UtcDateTime),) - - @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] - def ensure_start_date_in_ti_run_context(response: ResponseInfo) -> None: # type: ignore[misc] - """ - Ensure start_date is never None in DagRun for previous API versions. - - Older Task SDK clients expect start_date to be non-nullable. When the - DagRun hasn't started yet (e.g. queued), fall back to run_after. - """ - dag_run = response.body.get("dag_run") - if isinstance(dag_run, dict) and dag_run.get("start_date") is None: - dag_run["start_date"] = dag_run.get("run_after") - - @convert_response_to_previous_version_for(DagRun) # type: ignore[arg-type] - def ensure_start_date_in_dag_run(response: ResponseInfo) -> None: # type: ignore[misc] - """Ensure start_date is never None in direct DagRun responses for previous API versions.""" - if response.body.get("start_date") is None: - response.body["start_date"] = response.body.get("run_after") + instructions_to_migrate_to_previous_version = (endpoint("/dags/{dag_id}", ["GET"]).didnt_exist,) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_13.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_13.py deleted file mode 100644 index 95da513d7bc..00000000000 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_13.py +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - -from cadwyn import VersionChange, endpoint - - -class AddDagEndpoint(VersionChange): - """Add the `/dags/{dag_id}` endpoint.""" - - description = __doc__ - - instructions_to_migrate_to_previous_version = (endpoint("/dags/{dag_id}", ["GET"]).didnt_exist,) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 55313198d5b..2f1c63a21c1 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1700,6 +1700,14 @@ api: type: string example: path/to/logging_config.yaml default: ~ + log_stream_buffer_size: + description: | + Number of log lines to buffer before flushing to the client when streaming task logs + in NDJSON format. Larger values reduce HTTP overhead but increase time-to-first-byte. + version_added: 3.2.0 + type: integer + example: ~ + default: "500" ssl_cert: description: | Paths to the SSL certificate and key for the api server. When both are diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx index f6df9e1c420..59ba801bacc 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/TaskLogContent.tsx @@ -18,7 +18,7 @@ */ import { Box, Code, VStack, IconButton } from "@chakra-ui/react"; import { useVirtualizer } from "@tanstack/react-virtual"; -import { type JSX, useLayoutEffect, useRef } from "react"; +import { type JSX, useLayoutEffect, useRef, useCallback, useEffect } from "react"; import { useHotkeys } from "react-hotkeys-hook"; import { useTranslation } from "react-i18next"; import { FiChevronDown, FiChevronUp } from "react-icons/fi"; @@ -37,6 +37,9 @@ export type TaskLogContentProps = { readonly wrap: boolean; }; +// How close to the bottom (in px) before we consider the user "at the bottom" +const SCROLL_BOTTOM_THRESHOLD = 100; + const ScrollToButton = ({ direction, onClick, @@ -83,6 +86,12 @@ export const TaskLogContent = ({ error, isLoading, logError, parsedLogs, wrap }: const hash = location.hash.replace("#", ""); const parentRef = useRef<HTMLDivElement | null>(null); + // Track whether user is at the bottom so we don't hijack their scroll position + // if they scrolled up to read something + const isAtBottomRef = useRef<boolean>(true); + // Track previous log count to detect new lines arriving + const prevLogCountRef = useRef<number>(0); + const rowVirtualizer = useVirtualizer({ count: parsedLogs.length, estimateSize: () => 20, @@ -94,6 +103,44 @@ export const TaskLogContent = ({ error, isLoading, logError, parsedLogs, wrap }: const containerHeight = rowVirtualizer.scrollElement?.clientHeight ?? 0; const showScrollButtons = parsedLogs.length > 1 && contentHeight > containerHeight; + // Check if user is near the bottom on scroll + const handleScroll = useCallback(() => { + const el = parentRef.current; + + if (!el) { + return; + } + const distanceFromBottom = el.scrollHeight - el.scrollTop - el.clientHeight; + + isAtBottomRef.current = distanceFromBottom <= SCROLL_BOTTOM_THRESHOLD; + }, []); + + useEffect(() => { + const el = parentRef.current; + + el?.addEventListener("scroll", handleScroll, { passive: true }); + + return () => el?.removeEventListener("scroll", handleScroll); + }, [handleScroll]); + + // Auto-scroll to bottom when: + // 1. Logs first load (prevLogCount was 0) + // 2. New lines arrive AND user was already at the bottom + useLayoutEffect(() => { + if (parsedLogs.length === 0) { + return; + } + + const isFirstLoad = prevLogCountRef.current === 0; + const hasNewLines = parsedLogs.length > prevLogCountRef.current; + + if ((isFirstLoad || (hasNewLines && isAtBottomRef.current)) && !location.hash) { + rowVirtualizer.scrollToIndex(parsedLogs.length - 1, { align: "end" }); + } + + prevLogCountRef.current = parsedLogs.length; + }, [parsedLogs.length, rowVirtualizer]); + useLayoutEffect(() => { if (location.hash && !isLoading) { rowVirtualizer.scrollToIndex(Math.min(Number(hash) + 5, parsedLogs.length - 1)); @@ -112,8 +159,10 @@ export const TaskLogContent = ({ error, isLoading, logError, parsedLogs, wrap }: } if (to === "top") { + isAtBottomRef.current = false; scrollToTop({ element: el, virtualizer: rowVirtualizer }); } else { + isAtBottomRef.current = true; scrollToBottom({ element: el, virtualizer: rowVirtualizer }); } }; diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index 2aa3be51220..a2910313951 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -264,7 +264,7 @@ class TestDagRunDetail: def test_get_state(self, client, session, dag_maker): dag_id = "test_dag_id" # Named deliberately to check if this routes correctly. - # See v2025_11_07.test_dag_runs::test_get_previous_dag_run_redirect + # See v2026_04_06.test_dag_runs::test_get_previous_dag_run_redirect run_id = "previous" with dag_maker(dag_id=dag_id, schedule=None, session=session, serialized=True): diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/__init__.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/__init__.py deleted file mode 100644 index 13a83393a91..00000000000 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_11_07/__init__.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/__init__.py similarity index 100% rename from airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_11_07/__init__.py rename to airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/__init__.py diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_11_07/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dag_runs.py similarity index 93% rename from airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_11_07/test_dag_runs.py rename to airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dag_runs.py index c2821dce55b..a3c7ff77d14 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_11_07/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dag_runs.py @@ -27,7 +27,8 @@ pytestmark = pytest.mark.db_test @pytest.fixture def ver_client(client): - client.headers["Airflow-API-Version"] = "2025-11-07" + """Last released execution API (before 2026-04-06); uses legacy previous-run path.""" + client.headers["Airflow-API-Version"] = "2025-11-05" return client diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_dags.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dags.py similarity index 89% rename from airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_dags.py rename to airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dags.py index 72693443944..5fe7fb26fda 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dags.py @@ -24,7 +24,8 @@ pytestmark = pytest.mark.db_test @pytest.fixture def old_ver_client(client): - client.headers["Airflow-API-Version"] = "2026-03-31" + """Last released execution API before `/dags/{dag_id}` was added.""" + client.headers["Airflow-API-Version"] = "2025-11-05" return client diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_task_instances.py similarity index 96% rename from airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_task_instances.py rename to airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_task_instances.py index 7fb44ce7ebe..8117ac6b69c 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_task_instances.py @@ -40,8 +40,8 @@ RUN_PATCH_BODY = { @pytest.fixture def old_ver_client(client): - """Client configured to use API version before start_date nullable change.""" - client.headers["Airflow-API-Version"] = "2025-12-08" + """Last released execution API before nullable DagRun.start_date (2026-04-06 bundle).""" + client.headers["Airflow-API-Version"] = "2025-11-05" return client diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index b6c08e9d76c..e08f00562d3 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -27,7 +27,7 @@ from uuid import UUID from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue, RootModel -API_VERSION: Final[str] = "2026-04-13" +API_VERSION: Final[str] = "2026-04-06" class AssetAliasReferenceAssetEventDagRun(BaseModel):
