laurentpellegrino opened a new issue, #65044: URL: https://github.com/apache/airflow/issues/65044
### Under which category would you file this issue?
Airflow Core
### Apache Airflow version
3.2.0
### What happened and how to reproduce it?
TaskInstance.get_task_instance accepts a required dag_id argument but does
not include it in the query's filter_by. The query filters only by (run_id,
task_id, map_index). Any caller that
passes dag_id expecting disambiguation silently gets the wrong row when
the triple is unique across DAGs, and gets sqlalchemy.exc.MultipleResultsFound
when it isn't.
airflow-core/src/airflow/models/taskinstance.py on main, L784–L807:
@classmethod
@provide_session
def get_task_instance(
cls,
dag_id: str,
run_id: str,
task_id: str,
map_index: int,
lock_for_update: bool = False,
session: Session = NEW_SESSION,
) -> TaskInstance | None:
query = (
select(TaskInstance)
.options(lazyload(TaskInstance.dag_run))
.filter_by(
run_id=run_id,
task_id=task_id,
map_index=map_index,
)
)
if lock_for_update:
for attempt in run_with_db_retries(logger=cls.logger()):
with attempt:
return
session.execute(query.with_for_update()).scalar_one_or_none()
else:
return session.execute(query).scalar_one_or_none()
dag_id is unused.
Real-world impact: permanent EdgeExecutor scheduler crash loop
EdgeExecutor._update_orphaned_jobs
(providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py on
main, L189, crashing call at L203) calls this on every scheduler sync for
every lifeless EdgeJobModel row:
for job in lifeless_jobs:
ti = TaskInstance.get_task_instance(
dag_id=job.dag_id,
run_id=job.run_id,
task_id=job.task_id,
map_index=job.map_index,
session=session,
)
job.state = ti.state if ti and ti.state else TaskInstanceState.REMOVED
...
If any orphaned edge_job row has a (run_id, task_id, map_index) triple
shared with a TaskInstance in a different DAG, this query raises
MultipleResultsFound. The exception propagates up
_update_orphaned_jobs → EdgeExecutor.sync → executor.heartbeat() →
_run_scheduler_loop, the scheduler process exits, its supervisor (Kubernetes,
systemd, etc.) restarts it, the exact same
stale edge_job row is picked up on the next sync, scheduler crashes again
— a permanent crash loop. The scheduler never dispatches another queued task to
the edge worker until the
offending row is manually removed from the metadata DB.
Triggering the collision is easy in practice:
- Multiple DAGs on the same cron (e.g. @daily) → identical
scheduled__<logical_date> run_ids
- Shared generic task_ids across those DAGs (done, cleanup, notify, end,
start, …)
- Same map_index (typically -1 for non-mapped tasks)
Any edge worker event that leaves a row stuck in state=RUNNING past
[scheduler] task_instance_heartbeat_timeout (default 300s) — a worker OOM, a
SIGKILL, a network blip on the edge API, a
missed heartbeat while the edge_job completion callback was in flight — is
enough to arm the landmine. The scheduler then crashes on its next sync.
Traceback
Traceback (most recent call last):
File ".../airflow/cli/commands/scheduler_command.py", line 48, in
_run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File ".../airflow/jobs/job.py", line 355, in run_job
return execute_job(job, execute_callable=execute_callable)
File ".../airflow/jobs/job.py", line 384, in execute_job
ret = execute_callable()
File ".../airflow/jobs/scheduler_job_runner.py", line 1463, in _execute
self._run_scheduler_loop()
File ".../airflow/jobs/scheduler_job_runner.py", line 1610, in
_run_scheduler_loop
executor.heartbeat()
File ".../airflow/executors/base_executor.py", line 310, in heartbeat
self.sync()
File ".../airflow/providers/edge3/executors/edge_executor.py", line 312,
in sync
orphaned = self._update_orphaned_jobs(session)
File ".../airflow/providers/edge3/executors/edge_executor.py", line 203,
in _update_orphaned_jobs
ti = TaskInstance.get_task_instance(
dag_id=job.dag_id,
run_id=job.run_id,
task_id=job.task_id,
map_index=job.map_index,
session=session,
)
File ".../airflow/models/taskinstance.py", line 806, in get_task_instance
return session.execute(query).scalar_one_or_none()
File ".../sqlalchemy/engine/result.py", line 1504, in scalar_one_or_none
return self._only_one_row(raise_for_second_row=True,
raise_for_none=False, scalar=True)
File ".../sqlalchemy/engine/result.py", line 825, in _only_one_row
raise exc.MultipleResultsFound(...)
sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one or
none was required
Reproducer
Two DAGs on the same schedule sharing a task_id:
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
for name in ("dag_a", "dag_b"):
with DAG(name, start_date=datetime(2026, 1, 1), schedule="@daily",
catchup=False):
EmptyOperator(task_id="done")
Steps:
1. Configure the EdgeExecutor and run an edge worker.
2. Trigger both DAGs so each has a TaskInstance for
(run_id='scheduled__<logical_date>', task_id='done', map_index=-1).
3. Leave an edge_job row orphaned: state=running, last_update older than
[scheduler] task_instance_heartbeat_timeout (default 300s). Easiest way: kill
-9 the edge worker mid-task, or drop
the network between worker and API long enough to miss heartbeats.
4. On the next scheduler sync, _update_orphaned_jobs picks up that row and
calls get_task_instance. Because dag_id is ignored in the filter, the query
returns both TIs →
MultipleResultsFound → scheduler process exits.
5. The supervisor restarts the scheduler; the same stale row is still in
the DB; go to step 4.
Direct DB proof
I ran the exact call the scheduler makes, with the exact arguments,
against the live metadata DB:
>>> from airflow.models.taskinstance import TaskInstance
>>> from airflow.utils.session import create_session
>>> with create_session() as s:
... TaskInstance.get_task_instance(
... dag_id='dag_a',
... run_id='scheduled__2026-04-11T01:00:00+00:00',
... task_id='done',
... map_index=-1,
... session=s,
... )
sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one or
none was required
The two matching rows:
┌────────┬─────────┬──────────────────────────────────────┬─────────┐
│ dag_id │ task_id │ run_id │ state │
├────────┼─────────┼──────────────────────────────────────┼─────────┤
│ dag_a │ done │ scheduled__2026-04-11T01:00:00+00:00 │ success │
├────────┼─────────┼──────────────────────────────────────┼─────────┤
│ dag_b │ done │ scheduled__2026-04-11T01:00:00+00:00 │ success │
└────────┴─────────┴──────────────────────────────────────┴─────────┘
Both TaskInstances had already finished successfully hours earlier. The
edge_job rows were stale leftovers whose completion callback never updated
edge_job.state away from running, which
is what kept them eligible for orphan handling and re-triggered the crash
on every sync cycle.
Workaround
1. Manually delete the stale edge_job rows whose (run_id, task_id,
map_index) triples collide across DAGs.
2. Restart the scheduler.
3. Rename colliding task_ids in DAGs on identical schedules to be
DAG-unique (done → <dag_name>_done, etc.) so the collision cannot reoccur while
the core bug is unfixed.
### What you think should happen instead?
TaskInstance.get_task_instance should include dag_id in its filter. The
parameter is part of the method signature, it's required (not optional), and
every call site passes it expecting
disambiguation. The current behavior is silently wrong: for non-colliding
data it returns the "right" row by accident, and for colliding data it raises
MultipleResultsFound and surfaces as
an exception in whichever caller happens to trigger it first.
Minimal fix in airflow-core/src/airflow/models/taskinstance.py:
query = (
select(TaskInstance)
.options(lazyload(TaskInstance.dag_run))
.filter_by(
dag_id=dag_id,
run_id=run_id,
task_id=task_id,
map_index=map_index,
)
)
Other call sites of get_task_instance that pass dag_id under the same
assumption should be audited — any place that pre-filters a set of candidate
jobs by dag_id and then looks up their
TIs is latently exposed to the same class of bug (silent wrong row in the
happy path, crash in the collision path).
Independent defensive hardening in providers/edge3
Even with the core fix merged, the scheduler loop should not be killable
by a single poisoned edge_job row. A stray database inconsistency, a future
provider regression, or an unrelated
IntegrityError/DataError on one row shouldn't take down the whole
scheduler — the loss of the entire dispatch pipeline is wildly disproportionate
to the blast radius of one stale job.
Wrap the per-row lookup in EdgeExecutor._update_orphaned_jobs
(providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py on
main, L203) in a try/except, log+skip on failure,
and mark the offending row as REMOVED so it doesn't come back on the next
sync:
from sqlalchemy.exc import MultipleResultsFound, SQLAlchemyError
for job in lifeless_jobs:
try:
ti = TaskInstance.get_task_instance(
dag_id=job.dag_id,
run_id=job.run_id,
task_id=job.task_id,
map_index=job.map_index,
session=session,
)
except (MultipleResultsFound, SQLAlchemyError):
self.log.exception(
"Failed to resolve TaskInstance for orphaned edge_job "
"(dag_id=%s task_id=%s run_id=%s map_index=%s); marking as
REMOVED",
job.dag_id, job.task_id, job.run_id, job.map_index,
)
job.state = TaskInstanceState.REMOVED
continue
job.state = ti.state if ti and ti.state else TaskInstanceState.REMOVED
...
Both changes are small and independently mergeable. The core fix closes
the root cause; the edge3 hardening is the backstop that prevents a future
variant of the same class of bug from
causing another outage.
### Operating System
Talos Linux v1.11.6 (Kubernetes 1.33.10)
### Deployment
Other
### Apache Airflow Provider(s)
edge3
### Versions of Apache Airflow Providers
apache-airflow-providers-edge3==3.3.0
### Official Helm Chart version
Not Applicable
### Kubernetes Version
_No response_
### Helm Chart configuration
Not Applicable
### Docker Image customizations
Base image: apache/airflow:3.2.0
Added packages: elaunira-airflow, elaunira-airflow-providers-r2index,
elaunira-r2index, openplanetdata-airflow
No modifications to Airflow core or the edge3 provider.
### Anything else?
Frequency: deterministic once the DB holds a poisoned `edge_job` row. Our
scheduler accumulated 75 restarts in ~9 hours and never recovered on its own.
Workaround was to delete the 4 stale `edge_job` rows whose `(run_id, task_id,
map_index)` triples collided across DAGs, then restart the scheduler pod.
Renaming shared task_ids (`done`, `cleanup`) to DAG-unique names
prevents recurrence.
Collision surface is broader than it looks: any two DAGs on the same cron
schedule that share a task_id will produce colliding TIs on every run. Generic
names like `done`, `cleanup`, `notify`, `start`, `end` are common in template
DAGs.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
