This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 57ceb425880 [v3-2-test] Fix missing dag_id in get_task_instance
(#64957) (#64968) (#65067)
57ceb425880 is described below
commit 57ceb425880a3d4c6551a6c8dd667a78b3f3fabe
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Apr 11 20:30:28 2026 +0200
[v3-2-test] Fix missing dag_id in get_task_instance (#64957) (#64968)
(#65067)
(cherry picked from commit 4ecbd59c0bea22d5a4940f9b67f837b09bd8ec6c)
Co-authored-by: holmuk <[email protected]>
---
airflow-core/src/airflow/models/taskinstance.py | 1 +
.../tests/unit/models/test_taskinstance.py | 75 ++++++++++++++++++++++
2 files changed, 76 insertions(+)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index a0b95d8a3ea..742bc363724 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -792,6 +792,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
select(TaskInstance)
.options(lazyload(TaskInstance.dag_run)) # lazy load dag run to
avoid locking it
.filter_by(
+ dag_id=dag_id,
run_id=run_id,
task_id=task_id,
map_index=map_index,
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index 07dbde0ec9b..7a47198cb8b 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -2605,6 +2605,81 @@ class TestTaskInstance:
# the new try_id should be different from what's recorded in tih
assert tih[0].task_instance_id == try_id
+ @pytest.mark.parametrize(
+ ("first_ti", "second_ti"),
+ [
+ pytest.param(
+ ("dag_1", "run_1", "task_1", -1),
+ ("dag_2", "run_1", "task_1", -1),
+ id="tasks_with_different_dags",
+ ),
+ pytest.param(
+ ("dag_1", "run_1", "task_1", -1),
+ ("dag_1", "run_2", "task_1", -1),
+ id="tasks_with_different_runs",
+ ),
+ # There are no cases with equal dag_id/run_id because
create_task_instance()
+ # creates a DagRun each time, and DagRun has a unique (dag_id,
run_id) constraint.
+ ],
+ )
+ def test_get_task_instance_disambiguates_by_dag_id_and_run_id(
+ self, create_task_instance, session, first_ti, second_ti
+ ):
+ dag_id_1, run_id_1, task_id_1, map_index_1 = first_ti
+ dag_id_2, run_id_2, task_id_2, map_index_2 = second_ti
+
+ ti1 = create_task_instance(
+ dag_id=dag_id_1,
+ run_id=run_id_1,
+ task_id=task_id_1,
+ map_index=map_index_1,
+ session=session,
+ )
+ ti2 = create_task_instance(
+ dag_id=dag_id_2,
+ run_id=run_id_2,
+ task_id=task_id_2,
+ map_index=map_index_2,
+ session=session,
+ )
+
+ # Regression setup for #64957: if dag_id is ignored, this lookup key
becomes ambiguous.
+ if dag_id_1 != dag_id_2:
+ ambiguous_count = session.scalar(
+ select(func.count())
+ .select_from(TI)
+ .filter_by(run_id=run_id_1, task_id=task_id_1,
map_index=map_index_1)
+ )
+ assert ambiguous_count == 2, "Setup failure: expected two TIs
matching without dag_id filter"
+
+ # This case does not target the original regression directly (run_id
was already filtered),
+ # but we keep it as defense-in-depth against future changes.
+ found_1 = TI.get_task_instance(
+ dag_id=dag_id_1,
+ run_id=run_id_1,
+ task_id=task_id_1,
+ map_index=map_index_1,
+ session=session,
+ )
+ found_2 = TI.get_task_instance(
+ dag_id=dag_id_2,
+ run_id=run_id_2,
+ task_id=task_id_2,
+ map_index=map_index_2,
+ session=session,
+ )
+
+ assert found_1 is not None
+ assert found_2 is not None
+
+ assert found_1.id == ti1.id
+ assert found_2.id == ti2.id
+
+ # Keep dag_id assertions explicit to document the regression intent
(#64957):
+ # get_task_instance() must disambiguate identical run/task/map_index
by dag_id.
+ assert found_1.dag_id == dag_id_1
+ assert found_2.dag_id == dag_id_2
+
@pytest.mark.parametrize("pool_override", [None, "test_pool2"])
@pytest.mark.parametrize("queue_by_policy", [None, "forced_queue"])