kiran2706 commented on code in PR #51949:
URL: https://github.com/apache/airflow/pull/51949#discussion_r2160701968


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1352,21 +1352,58 @@ def notify_dagrun_state_changed(self, msg: str = ""):
 
     def handle_dag_callback(self, dag: SDKDAG, success: bool = True, reason: 
str = "success"):
         """Only needed for `dag.test` where `execute_callbacks=True` is passed 
to `update_state`."""
+        task_instances = self.get_task_instances()
+
+        # Identify the most relevant task instance
+        last_relevant_ti = None
+        if not success:
+            failed_tis = [ti for ti in task_instances if ti.state in 
State.failed_states and ti.end_date]
+            failed_tis.sort(key=lambda x: x.end_date, reverse=True)
+            last_relevant_ti = failed_tis[0] if failed_tis else None
+        else:
+            success_tis = [ti for ti in task_instances if ti.state in 
State.success_states and ti.end_date]
+            success_tis.sort(key=lambda x: x.end_date, reverse=True)
+            last_relevant_ti = success_tis[0] if success_tis else None
+
+        # Enrich DAG-level callback context
         context: Context = {  # type: ignore[assignment]
             "dag": dag,
             "run_id": str(self.run_id),
+            "execution_date": self.logical_date,
+            "start_date": self.start_date,
+            "end_date": self.end_date,
+            "data_interval_start": self.data_interval_start,
+            "data_interval_end": self.data_interval_end,
             "reason": reason,
+            "run_duration": (
+                (self.end_date - self.start_date).total_seconds()
+                if self.start_date and self.end_date
+                else None
+            ),
         }
 
+        # Add task-level metadata if available
+        if last_relevant_ti:
+            context.update(
+                {
+                    "task_instance": last_relevant_ti,
+                    "ti": last_relevant_ti,
+                    "try_number": last_relevant_ti.try_number,
+                    "max_tries": last_relevant_ti.max_tries,
+                    "log_url": last_relevant_ti.log_url,
+                    "mark_success_url": last_relevant_ti.mark_success_url,
+                }
+            )

Review Comment:
   Thanks for the feedback! You're absolutely right, values like log_url, 
try_number, etc., are already accessible via context["ti"], so defining them 
separately is redundant. I initially added them thinking it might simplify 
Jinja templating (e.g., allowing {{ log_url }} directly), but since this 
callback context is used only in tests it's unnecessary.
   
   I’ll simplify the update to just:
   
   context.update(
       task_instance=last_relevant_ti,
       ti=last_relevant_ti,
   )
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to