This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ea456d807e6 Add response migration for `consumed_asset_events` events
field (#48940)
ea456d807e6 is described below
commit ea456d807e62b125b3934d8bb080f951d4b02f6f
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Apr 8 15:38:05 2025 +0100
Add response migration for `consumed_asset_events` events field (#48940)
And, like I should have done in #48888, add tests this time.
The issue was caused by me misunderstanding how Cadwyn works, the
`instructions_to_migrate_to_previous_version` only affect the OpenAPI
schema,
but since the change in this case results in an "invalid" model being
returned, we have to write our own migration for the data.
For those curious, the error without this migration function is:
> fastapi.exceptions.ResponseValidationError: 1 validation errors:
> {'type': 'extra_forbidden', 'loc': ('response', 'dag_run',
'consumed_asset_events'), 'msg': 'Extra inputs are not permitted', 'input': []}
---
.../execution_api/versions/v2025_04_10.py | 8 +-
.../versions/v2025_03_26/__init__.py} | 14 ---
.../versions/v2025_03_26/test_task_instances.py | 99 ++++++++++++++++++++++
3 files changed, 105 insertions(+), 16 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_04_10.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_04_10.py
index c633a66215c..b0f32c1af3e 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_04_10.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_04_10.py
@@ -17,9 +17,9 @@
from __future__ import annotations
-from cadwyn import VersionChange, schema
+from cadwyn import ResponseInfo, VersionChange,
convert_response_to_previous_version_for, schema
-from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
+from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun,
TIRunContext
class AddConsumedAssetEventsField(VersionChange):
@@ -28,3 +28,7 @@ class AddConsumedAssetEventsField(VersionChange):
description = __doc__
instructions_to_migrate_to_previous_version =
(schema(DagRun).field("consumed_asset_events").didnt_exist,)
+
+ @convert_response_to_previous_version_for(TIRunContext) # type: ignore
+ def remove_consumed_asset_events(response: ResponseInfo): # type: ignore
+ response.body["dag_run"].pop("consumed_asset_events")
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_04_10.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_03_26/__init__.py
similarity index 65%
copy from
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_04_10.py
copy to
airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_03_26/__init__.py
index c633a66215c..13a83393a91 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_04_10.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_03_26/__init__.py
@@ -14,17 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-from __future__ import annotations
-
-from cadwyn import VersionChange, schema
-
-from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
-
-
-class AddConsumedAssetEventsField(VersionChange):
- """Add the `consumed_asset_events` to DagRun model."""
-
- description = __doc__
-
- instructions_to_migrate_to_previous_version =
(schema(DagRun).field("consumed_asset_events").didnt_exist,)
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_03_26/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_03_26/test_task_instances.py
new file mode 100644
index 00000000000..39ee9335b10
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_03_26/test_task_instances.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+from airflow.utils import timezone
+from airflow.utils.state import State
+
+from tests_common.test_utils.db import clear_db_assets, clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+
+DEFAULT_START_DATE = timezone.parse("2024-10-31T11:00:00Z")
+DEFAULT_END_DATE = timezone.parse("2024-10-31T12:00:00Z")
+
+
[email protected]
+def ver_client(client):
+ client.headers["Airflow-API-Version"] = "2025-03-26"
+ return client
+
+
+class TestTIUpdateState:
+ def setup_method(self):
+ clear_db_assets()
+ clear_db_runs()
+
+ def teardown_method(self):
+ clear_db_assets()
+ clear_db_runs()
+
+ def test_ti_run(self, ver_client, session, create_task_instance,
time_machine):
+ """
+ Test that this version of the endpoint works.
+
+ Later versions add a consumed_asset_events field.
+ """
+ instant_str = "2024-09-30T12:00:00Z"
+ instant = timezone.parse(instant_str)
+ time_machine.move_to(instant, tick=False)
+
+ ti = create_task_instance(
+ task_id="test_ti_run_state_to_running",
+ state=State.QUEUED,
+ session=session,
+ start_date=instant,
+ )
+ session.commit()
+
+ response = ver_client.patch(
+ f"/execution/task-instances/{ti.id}/run",
+ json={
+ "state": "running",
+ "hostname": "random-hostname",
+ "unixname": "random-unixname",
+ "pid": 100,
+ "start_date": instant_str,
+ },
+ )
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_run": {
+ "dag_id": "dag",
+ "run_id": "test",
+ "clear_number": 0,
+ "logical_date": instant_str,
+ "data_interval_start":
instant.subtract(days=1).to_iso8601_string(),
+ "data_interval_end": instant_str,
+ "run_after": instant_str,
+ "start_date": instant_str,
+ "end_date": None,
+ "run_type": "manual",
+ "conf": {},
+ },
+ "task_reschedule_count": 0,
+ "max_tries": 0,
+ "should_retry": False,
+ "variables": [],
+ "connections": [],
+ "xcom_keys_to_clear": [],
+ }