Copilot commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3068302982


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -404,6 +416,52 @@ def monitor_job(self, context: Context):
                 **awslogs[0],
             )
 
+    def monitor_job(self, context: Context):
+        """
+        Monitor an AWS Batch job.
+
+        This can raise an exception or an AirflowTaskTimeout if the task was
+        created with ``execution_timeout``.
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        # Persist job definition and queue links
+        self._persist_links(context)
+
+        if self.awslogs_enabled:
+            if self.waiters:
+                self.waiters.wait_for_job(self.job_id, 
get_batch_log_fetcher=self._get_batch_log_fetcher)
+            else:
+                self.hook.wait_for_job(self.job_id, 
get_batch_log_fetcher=self._get_batch_log_fetcher)
+        else:
+            if self.waiters:
+                self.waiters.wait_for_job(self.job_id)
+            else:
+                self.hook.wait_for_job(self.job_id)
+
+        # After job completes, fetch and persist CloudWatch logs once
+        try:
+            awslogs = self.hook.get_job_all_awslogs_info(self.job_id)
+            if awslogs:
+                self.log.info(
+                    "AWS Batch job (%s) CloudWatch Events details found. Links 
to logs:", self.job_id
+                )
+                link_builder = CloudWatchEventsLink()
+                for log in awslogs:
+                    self.log.info(link_builder.format_link(**log))
+
+                # Persist the first log stream as the UI link
+                CloudWatchEventsLink.persist(
+                    context=context,
+                    operator=self,
+                    region_name=self.hook.conn_region_name,
+                    aws_partition=self.hook.conn_partition,
+                    **awslogs[0],
+                )
+        except AirflowException as ae:
+            self.log.warning("Cannot determine where to find the AWS logs for 
this Batch job: %s", ae)
+

Review Comment:
   `monitor_job()` re-implements CloudWatch link persistence and uses a 
different warning message than `_persist_cloudwatch_link()`, so the two paths 
can diverge over time. Since `_persist_cloudwatch_link()` already encapsulates 
the persistence + logging behavior, consider reusing it here (and keeping 
`monitor_job()` focused on waiting + final success check).
   ```suggestion
           self._persist_cloudwatch_link(context)
   ```



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -257,9 +263,13 @@ def execute_complete(self, context: Context, event: 
dict[str, Any] | None = None
 
         self.job_id = validated_event["job_id"]
 
-        # Fetch logs if awslogs_enabled
+        # Fetch logs if awslogs_enabled (links were already persisted before 
deferring)
         if self.awslogs_enabled:
-            self.monitor_job(context)  # fetch logs, no need to return
+            # monitor_job() handles log fetching and success check
+            self.monitor_job(context)
+        else:
+            # Links already persisted before deferring, just check job success
+            self.hook.check_job_success(self.job_id)

Review Comment:
   In deferrable mode, `execute_complete()` is invoked only after 
`BatchJobTrigger` has already waited for job completion. Calling 
`monitor_job()` here re-runs `wait_for_job()` (including potential delay/jitter 
+ extra describe calls), which can unnecessarily slow the task and increase 
Batch API usage. Consider splitting `monitor_job()` into (a) a wait/stream-logs 
part for non-deferrable mode and (b) a post-completion part for deferrable mode 
(e.g., persist links / fetch CloudWatch link and then `check_job_success`).



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -257,9 +263,13 @@ def execute_complete(self, context: Context, event: 
dict[str, Any] | None = None
 
         self.job_id = validated_event["job_id"]
 
-        # Fetch logs if awslogs_enabled
+        # Fetch logs if awslogs_enabled (links were already persisted before 
deferring)
         if self.awslogs_enabled:
-            self.monitor_job(context)  # fetch logs, no need to return
+            # monitor_job() handles log fetching and success check
+            self.monitor_job(context)
+        else:
+            # Links already persisted before deferring, just check job success

Review Comment:
   `execute_complete()` now skips `monitor_job()` entirely when 
`awslogs_enabled=False`, which means in deferrable mode the CloudWatch operator 
link is never persisted after job completion (unlike the non-deferrable path 
where `monitor_job()` persists it after waiting). If the intent is that the UI 
link should be available regardless of whether logs are printed, consider 
persisting the CloudWatch link in this branch too (ideally guarded by 
`do_xcom_push`), or make the behavior consistent across 
deferrable/non-deferrable modes.
   ```suggestion
               # Persist the CloudWatch link after deferred completion too, 
even when logs
               # are not being fetched, to keep operator link behavior 
consistent.
               if self.do_xcom_push:
                   self._persist_cloudwatch_link(context)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to