This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ce1270b96fd Return 410 Gone for heartbeat when cleared TI exists in
TIH (#61631)
ce1270b96fd is described below
commit ce1270b96fd22165f76e437d57eb812abb6691a3
Author: André Ahlert <[email protected]>
AuthorDate: Fri Apr 3 16:04:02 2026 -0300
Return 410 Gone for heartbeat when cleared TI exists in TIH (#61631)
* Return 410 Gone for heartbeat when TI was cleared and moved to TIH
When a running task instance is cleared, its previous try is archived
to the Task Instance History table and the TI receives a new UUID.
Subsequent heartbeats from the old process get a 404 because the old
UUID no longer exists in the TI table.
This change improves the error handling by checking the TIH table when
a heartbeat TI is not found. If the UUID exists in TIH, return 410
Gone instead of 404 Not Found, giving the client a more specific
signal that the task was cleared rather than never existing.
- Server: check TIH on heartbeat NoResultFound, return 410 if found
- Supervisor: handle 410 Gone same as 404/409 (terminate process)
- Keep 404 for TIs that genuinely never existed
closes: #53140
* Update task_instances.py
Co-authored-by: Amogh Desai <[email protected]>
* Update task_instances.py
Co-authored-by: Amogh Desai <[email protected]>
* Update test_task_instances.py
Co-authored-by: Amogh Desai <[email protected]>
* fix(api): use task_instance_id in heartbeat 410 path and align detail
message
- Replace undefined ti_id_str with task_instance_id in TIH query and log
- Use task_instance_id (UUID) for TIH.task_instance_id comparison
- Set 410 Gone detail message to match test expectation
---------
Co-authored-by: Amogh Desai <[email protected]>
---
.../execution_api/routes/task_instances.py | 22 ++++++++++++++
.../versions/head/test_task_instances.py | 34 ++++++++++++++++++++++
.../src/airflow/sdk/execution_time/supervisor.py | 2 +-
3 files changed, 57 insertions(+), 1 deletion(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 5f5073c916b..bb1666e5137 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -70,6 +70,7 @@ from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun as DR
from airflow.models.log import Log
from airflow.models.taskinstance import TaskInstance as TI,
_stop_remaining_tasks
+from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.trigger import Trigger
from airflow.models.xcom import XComModel
@@ -679,6 +680,9 @@ def ti_skip_downstream(
status.HTTP_409_CONFLICT: {
"description": "The TI attempting to heartbeat should be
terminated for the given reason"
},
+ status.HTTP_410_GONE: {
+ "description": "Task Instance not found in the TI table but exists
in the Task Instance History table"
+ },
HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for
the state transition"},
},
)
@@ -702,6 +706,24 @@ def ti_heartbeat(
"Retrieved current task state", state=previous_state,
current_hostname=hostname, current_pid=pid
)
except NoResultFound:
+ # Check if the TI exists in the Task Instance History table.
+ # If it does, it was likely cleared while running, so return 410 Gone
+ # instead of 404 Not Found to give the client a more specific signal.
+ tih_exists = session.scalar(
+
select(func.count(TIH.task_instance_id)).where(TIH.task_instance_id ==
task_instance_id)
+ )
+ if tih_exists:
+ log.error(
+ "TaskInstance was previously cleared and archived in history,
heartbeat skipped",
+ ti_id=str(task_instance_id),
+ )
+ raise HTTPException(
+ status_code=status.HTTP_410_GONE,
+ detail={
+ "reason": "not_found",
+ "message": "Task Instance not found, it may have been
moved to the Task Instance History table",
+ },
+ )
log.error("Task Instance not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 7f766ede71e..a41396e4960 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1832,6 +1832,40 @@ class TestTIHealthEndpoint:
"message": "Task Instance not found",
}
+ def test_ti_heartbeat_cleared_task_returns_410(self, client, session,
create_task_instance):
+ """Test that a 410 error is returned when a TI was cleared and moved
to TIH."""
+ ti = create_task_instance(
+ task_id="test_ti_heartbeat_cleared",
+ state=State.RUNNING,
+ hostname="random-hostname",
+ pid=1547,
+ session=session,
+ )
+ session.commit()
+ old_ti_id = ti.id
+
+ # Simulate task being cleared: this archives the current try to TIH
+ # and assigns a new UUID to the TI, mirroring
prepare_db_for_next_try().
+ ti.prepare_db_for_next_try(session)
+ session.commit()
+
+ assert session.get(TaskInstance, old_ti_id) is None
+ tih = session.scalar(
+
select(TaskInstanceHistory).where(TaskInstanceHistory.task_instance_id ==
old_ti_id)
+ )
+ assert tih is not None
+
+ response = client.put(
+ f"/execution/task-instances/{old_ti_id}/heartbeat",
+ json={"hostname": "random-hostname", "pid": 1547},
+ )
+
+ assert response.status_code == 410
+ assert response.json()["detail"] == {
+ "reason": "not_found",
+ "message": "Task Instance not found, it may have been moved to the
Task Instance History table",
+ }
+
@pytest.mark.parametrize(
"ti_state",
[State.SUCCESS, State.FAILED],
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index a4386927fee..8675305dde9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1184,7 +1184,7 @@ class ActivitySubprocess(WatchedSubprocess):
# Reset the counter on success
self.failed_heartbeats = 0
except ServerResponseError as e:
- if e.response.status_code in {HTTPStatus.NOT_FOUND,
HTTPStatus.CONFLICT}:
+ if e.response.status_code in {HTTPStatus.NOT_FOUND,
HTTPStatus.GONE, HTTPStatus.CONFLICT}:
log.error(
"Server indicated the task shouldn't be running anymore",
detail=e.detail,