dondaum commented on code in PR #62164:
URL: https://github.com/apache/airflow/pull/62164#discussion_r3073422983
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1870,30 +1870,33 @@ def _mark_backfills_complete(self, session: Session =
NEW_SESSION) -> None:
self.log.debug("checking for completed backfills.")
unfinished_states = (DagRunState.RUNNING, DagRunState.QUEUED)
now = timezone.utcnow()
- # todo: AIP-78 simplify this function to an update statement
initializing_cutoff = now - timedelta(minutes=2)
- query = select(Backfill).where(
- Backfill.completed_at.is_(None),
- # Guard: backfill must have at least one association,
- # otherwise it is still being set up (see #61375).
- # Allow cleanup of orphaned backfills older than 2 minutes
- # that failed during initialization and never got any associations.
- or_(
-
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id ==
Backfill.id)),
- Backfill.created_at < initializing_cutoff,
- ),
- ~exists(
- select(DagRun.id).where(
- and_(DagRun.backfill_id == Backfill.id,
DagRun.state.in_(unfinished_states))
+ result = cast(
+ "CursorResult",
+ session.execute(
+ update(Backfill)
+ .where(
+ Backfill.completed_at.is_(None),
+ # Guard: backfill must have at least one association,
+ # otherwise it is still being set up (see #61375).
+ # Allow cleanup of orphaned backfills older than 2 minutes
+ # that failed during initialization and never got any
associations.
+ or_(
+
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id ==
Backfill.id)),
+ Backfill.created_at < initializing_cutoff,
+ ),
+ ~exists(
+ select(DagRun.id).where(
+ and_(DagRun.backfill_id == Backfill.id,
DagRun.state.in_(unfinished_states))
+ )
+ ),
)
+ .values(completed_at=now)
),
Review Comment:
I checked the relevant code and think we can add
`.execution_options(synchronize_session=False)`. The scheduler event scheduler
runs this method from time to time, but does not use backfill objects. If I saw
it correctly, the backfill is not handled directly by the scheduler job either.
But I'd be happy if someone with more knowledge also took a look at it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]