This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 57ceb425880 [v3-2-test] Fix missing dag_id in get_task_instance 
(#64957) (#64968) (#65067)
57ceb425880 is described below

commit 57ceb425880a3d4c6551a6c8dd667a78b3f3fabe
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Apr 11 20:30:28 2026 +0200

    [v3-2-test] Fix missing dag_id in get_task_instance (#64957) (#64968) 
(#65067)
    
    (cherry picked from commit 4ecbd59c0bea22d5a4940f9b67f837b09bd8ec6c)
    
    Co-authored-by: holmuk <[email protected]>
---
 airflow-core/src/airflow/models/taskinstance.py    |  1 +
 .../tests/unit/models/test_taskinstance.py         | 75 ++++++++++++++++++++++
 2 files changed, 76 insertions(+)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index a0b95d8a3ea..742bc363724 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -792,6 +792,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
             select(TaskInstance)
             .options(lazyload(TaskInstance.dag_run))  # lazy load dag run to 
avoid locking it
             .filter_by(
+                dag_id=dag_id,
                 run_id=run_id,
                 task_id=task_id,
                 map_index=map_index,
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index 07dbde0ec9b..7a47198cb8b 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -2605,6 +2605,81 @@ class TestTaskInstance:
         # the new try_id should be different from what's recorded in tih
         assert tih[0].task_instance_id == try_id
 
+    @pytest.mark.parametrize(
+        ("first_ti", "second_ti"),
+        [
+            pytest.param(
+                ("dag_1", "run_1", "task_1", -1),
+                ("dag_2", "run_1", "task_1", -1),
+                id="tasks_with_different_dags",
+            ),
+            pytest.param(
+                ("dag_1", "run_1", "task_1", -1),
+                ("dag_1", "run_2", "task_1", -1),
+                id="tasks_with_different_runs",
+            ),
+            # There are no cases with equal dag_id/run_id because 
create_task_instance()
+            # creates a DagRun each time, and DagRun has a unique (dag_id, 
run_id) constraint.
+        ],
+    )
+    def test_get_task_instance_disambiguates_by_dag_id_and_run_id(
+        self, create_task_instance, session, first_ti, second_ti
+    ):
+        dag_id_1, run_id_1, task_id_1, map_index_1 = first_ti
+        dag_id_2, run_id_2, task_id_2, map_index_2 = second_ti
+
+        ti1 = create_task_instance(
+            dag_id=dag_id_1,
+            run_id=run_id_1,
+            task_id=task_id_1,
+            map_index=map_index_1,
+            session=session,
+        )
+        ti2 = create_task_instance(
+            dag_id=dag_id_2,
+            run_id=run_id_2,
+            task_id=task_id_2,
+            map_index=map_index_2,
+            session=session,
+        )
+
+        # Regression setup for #64957: if dag_id is ignored, this lookup key 
becomes ambiguous.
+        if dag_id_1 != dag_id_2:
+            ambiguous_count = session.scalar(
+                select(func.count())
+                .select_from(TI)
+                .filter_by(run_id=run_id_1, task_id=task_id_1, 
map_index=map_index_1)
+            )
+            assert ambiguous_count == 2, "Setup failure: expected two TIs 
matching without dag_id filter"
+
+        # This case does not target the original regression directly (run_id 
was already filtered),
+        # but we keep it as defense-in-depth against future changes.
+        found_1 = TI.get_task_instance(
+            dag_id=dag_id_1,
+            run_id=run_id_1,
+            task_id=task_id_1,
+            map_index=map_index_1,
+            session=session,
+        )
+        found_2 = TI.get_task_instance(
+            dag_id=dag_id_2,
+            run_id=run_id_2,
+            task_id=task_id_2,
+            map_index=map_index_2,
+            session=session,
+        )
+
+        assert found_1 is not None
+        assert found_2 is not None
+
+        assert found_1.id == ti1.id
+        assert found_2.id == ti2.id
+
+        # Keep dag_id assertions explicit to document the regression intent 
(#64957):
+        # get_task_instance() must disambiguate identical run/task/map_index 
by dag_id.
+        assert found_1.dag_id == dag_id_1
+        assert found_2.dag_id == dag_id_2
+
 
 @pytest.mark.parametrize("pool_override", [None, "test_pool2"])
 @pytest.mark.parametrize("queue_by_policy", [None, "forced_queue"])

Reply via email to