This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 1d4a55207f7 [v3-2-test] Return 410 Gone for heartbeat when cleared TI 
exists in TIH (#61631) (#64693)
1d4a55207f7 is described below

commit 1d4a55207f7eccba68288ea1ec4ec62fb47329fa
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 8 14:30:01 2026 +0300

    [v3-2-test] Return 410 Gone for heartbeat when cleared TI exists in TIH 
(#61631) (#64693)
    
    * Return 410 Gone for heartbeat when TI was cleared and moved to TIH
    
    When a running task instance is cleared, its previous try is archived
    to the Task Instance History table and the TI receives a new UUID.
    Subsequent heartbeats from the old process get a 404 because the old
    UUID no longer exists in the TI table.
    
    This change improves the error handling by checking the TIH table when
    a heartbeat TI is not found. If the UUID exists in TIH, return 410
    Gone instead of 404 Not Found, giving the client a more specific
    signal that the task was cleared rather than never existing.
    
    - Server: check TIH on heartbeat NoResultFound, return 410 if found
    - Supervisor: handle 410 Gone same as 404/409 (terminate process)
    - Keep 404 for TIs that genuinely never existed
    
    closes: #53140
    
    * Update task_instances.py
    
    
    
    * Update task_instances.py
    
    
    
    * Update test_task_instances.py
    
    
    
    * fix(api): use task_instance_id in heartbeat 410 path and align detail 
message
    
    - Replace undefined ti_id_str with task_instance_id in TIH query and log
    - Use task_instance_id (UUID) for TIH.task_instance_id comparison
    - Set 410 Gone detail message to match test expectation
    
    ---------
    (cherry picked from commit ce1270b96fd22165f76e437d57eb812abb6691a3)
    
    Co-authored-by: AndrĂ© Ahlert <[email protected]>
    Co-authored-by: Amogh Desai <[email protected]>
---
 .../execution_api/routes/task_instances.py         | 22 ++++++++++++++
 .../versions/head/test_task_instances.py           | 34 ++++++++++++++++++++++
 .../src/airflow/sdk/execution_time/supervisor.py   |  2 +-
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 644b0f5fa2a..e1687206d55 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -70,6 +70,7 @@ from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun as DR
 from airflow.models.log import Log
 from airflow.models.taskinstance import TaskInstance as TI, 
_stop_remaining_tasks
+from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.models.trigger import Trigger
 from airflow.models.xcom import XComModel
@@ -679,6 +680,9 @@ def ti_skip_downstream(
         status.HTTP_409_CONFLICT: {
             "description": "The TI attempting to heartbeat should be 
terminated for the given reason"
         },
+        status.HTTP_410_GONE: {
+            "description": "Task Instance not found in the TI table but exists 
in the Task Instance History table"
+        },
         HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for 
the state transition"},
     },
 )
@@ -702,6 +706,24 @@ def ti_heartbeat(
             "Retrieved current task state", state=previous_state, 
current_hostname=hostname, current_pid=pid
         )
     except NoResultFound:
+        # Check if the TI exists in the Task Instance History table.
+        # If it does, it was likely cleared while running, so return 410 Gone
+        # instead of 404 Not Found to give the client a more specific signal.
+        tih_exists = session.scalar(
+            
select(func.count(TIH.task_instance_id)).where(TIH.task_instance_id == 
task_instance_id)
+        )
+        if tih_exists:
+            log.error(
+                "TaskInstance was previously cleared and archived in history, 
heartbeat skipped",
+                ti_id=str(task_instance_id),
+            )
+            raise HTTPException(
+                status_code=status.HTTP_410_GONE,
+                detail={
+                    "reason": "not_found",
+                    "message": "Task Instance not found, it may have been 
moved to the Task Instance History table",
+                },
+            )
         log.error("Task Instance not found")
         raise HTTPException(
             status_code=status.HTTP_404_NOT_FOUND,
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 1ab690239fe..c6135711be9 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1832,6 +1832,40 @@ class TestTIHealthEndpoint:
             "message": "Task Instance not found",
         }
 
+    def test_ti_heartbeat_cleared_task_returns_410(self, client, session, 
create_task_instance):
+        """Test that a 410 error is returned when a TI was cleared and moved 
to TIH."""
+        ti = create_task_instance(
+            task_id="test_ti_heartbeat_cleared",
+            state=State.RUNNING,
+            hostname="random-hostname",
+            pid=1547,
+            session=session,
+        )
+        session.commit()
+        old_ti_id = ti.id
+
+        # Simulate task being cleared: this archives the current try to TIH
+        # and assigns a new UUID to the TI, mirroring 
prepare_db_for_next_try().
+        ti.prepare_db_for_next_try(session)
+        session.commit()
+
+        assert session.get(TaskInstance, old_ti_id) is None
+        tih = session.scalar(
+            
select(TaskInstanceHistory).where(TaskInstanceHistory.task_instance_id == 
old_ti_id)
+        )
+        assert tih is not None
+
+        response = client.put(
+            f"/execution/task-instances/{old_ti_id}/heartbeat",
+            json={"hostname": "random-hostname", "pid": 1547},
+        )
+
+        assert response.status_code == 410
+        assert response.json()["detail"] == {
+            "reason": "not_found",
+            "message": "Task Instance not found, it may have been moved to the 
Task Instance History table",
+        }
+
     @pytest.mark.parametrize(
         "ti_state",
         [State.SUCCESS, State.FAILED],
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 1dfefee5404..0eee4889c25 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1184,7 +1184,7 @@ class ActivitySubprocess(WatchedSubprocess):
             # Reset the counter on success
             self.failed_heartbeats = 0
         except ServerResponseError as e:
-            if e.response.status_code in {HTTPStatus.NOT_FOUND, 
HTTPStatus.CONFLICT}:
+            if e.response.status_code in {HTTPStatus.NOT_FOUND, 
HTTPStatus.GONE, HTTPStatus.CONFLICT}:
                 log.error(
                     "Server indicated the task shouldn't be running anymore",
                     detail=e.detail,

Reply via email to