kiran2706 commented on code in PR #51949:
URL: https://github.com/apache/airflow/pull/51949#discussion_r2160701968
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1352,21 +1352,58 @@ def notify_dagrun_state_changed(self, msg: str = ""):
def handle_dag_callback(self, dag: SDKDAG, success: bool = True, reason:
str = "success"):
"""Only needed for `dag.test` where `execute_callbacks=True` is passed
to `update_state`."""
+ task_instances = self.get_task_instances()
+
+ # Identify the most relevant task instance
+ last_relevant_ti = None
+ if not success:
+ failed_tis = [ti for ti in task_instances if ti.state in
State.failed_states and ti.end_date]
+ failed_tis.sort(key=lambda x: x.end_date, reverse=True)
+ last_relevant_ti = failed_tis[0] if failed_tis else None
+ else:
+ success_tis = [ti for ti in task_instances if ti.state in
State.success_states and ti.end_date]
+ success_tis.sort(key=lambda x: x.end_date, reverse=True)
+ last_relevant_ti = success_tis[0] if success_tis else None
+
+ # Enrich DAG-level callback context
context: Context = { # type: ignore[assignment]
"dag": dag,
"run_id": str(self.run_id),
+ "execution_date": self.logical_date,
+ "start_date": self.start_date,
+ "end_date": self.end_date,
+ "data_interval_start": self.data_interval_start,
+ "data_interval_end": self.data_interval_end,
"reason": reason,
+ "run_duration": (
+ (self.end_date - self.start_date).total_seconds()
+ if self.start_date and self.end_date
+ else None
+ ),
}
+ # Add task-level metadata if available
+ if last_relevant_ti:
+ context.update(
+ {
+ "task_instance": last_relevant_ti,
+ "ti": last_relevant_ti,
+ "try_number": last_relevant_ti.try_number,
+ "max_tries": last_relevant_ti.max_tries,
+ "log_url": last_relevant_ti.log_url,
+ "mark_success_url": last_relevant_ti.mark_success_url,
+ }
+ )
Review Comment:
Thanks for the feedback! You're absolutely right, values like log_url,
try_number, etc., are already accessible via context["ti"], so defining them
separately is redundant. I initially added them thinking it might simplify
Jinja templating (e.g., allowing {{ log_url }} directly), but since this
callback context is used only in tests it's unnecessary.
I’ll simplify the update to just:
context.update(
task_instance=last_relevant_ti,
ti=last_relevant_ti,
)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]