Copilot commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3066475109


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -257,6 +260,9 @@ def execute_complete(self, context: Context, event: 
dict[str, Any] | None = None
 
         self.job_id = validated_event["job_id"]
 
+        # Persist operator links for UI
+        self._persist_links(context)
+
         # Fetch logs if awslogs_enabled
         if self.awslogs_enabled:
             self.monitor_job(context)  # fetch logs, no need to return

Review Comment:
   `execute_complete()` persists links and then (when `awslogs_enabled`) calls 
`monitor_job()`, which immediately calls `_persist_links()` again. This 
duplicates XCom writes and AWS lookups. Consider removing one of the two calls 
(e.g., let `monitor_job()` handle persistence for the `awslogs_enabled` branch, 
or add a parameter/flag to `monitor_job()` to avoid re-persisting when already 
done).
   ```suggestion
           # Fetch logs if awslogs_enabled. monitor_job() persists operator 
links,
           # so avoid duplicating that work here.
           if self.awslogs_enabled:
               self.monitor_job(context)  # fetch logs, no need to return
           else:
               # Persist operator links for UI when monitor_job() is not called.
               self._persist_links(context)
   ```



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -403,6 +395,50 @@ def monitor_job(self, context: Context):
                 aws_partition=self.hook.conn_partition,
                 **awslogs[0],
             )
+        else:
+            # Persist placeholder to prevent "XCom not found" warnings
+            # CloudWatch logs will be updated when job completes
+            context["task_instance"].xcom_push(
+                key="cloudwatch_events",
+                value=None,
+            )
+
+    def monitor_job(self, context: Context):
+        """
+        Monitor an AWS Batch job.
+
+        This can raise an exception or an AirflowTaskTimeout if the task was
+        created with ``execution_timeout``.
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        # Persist operator links
+        self._persist_links(context)
+
+        if self.awslogs_enabled:
+            if self.waiters:
+                self.waiters.wait_for_job(self.job_id, 
get_batch_log_fetcher=self._get_batch_log_fetcher)
+            else:
+                self.hook.wait_for_job(self.job_id, 
get_batch_log_fetcher=self._get_batch_log_fetcher)
+        else:
+            if self.waiters:
+                self.waiters.wait_for_job(self.job_id)
+            else:
+                self.hook.wait_for_job(self.job_id)

Review Comment:
   In the non-deferrable path, `_persist_links()` now runs *before* waiting for 
the job. If CloudWatch logs are not available yet, `_persist_links()` pushes a 
`None` placeholder and the operator link may never be updated to a real 
CloudWatch link (because persistence is not rerun after `wait_for_job`). 
Consider persisting job-definition/job-queue links before waiting, then 
persisting the CloudWatch log link after `wait_for_job` (or call 
`_persist_links()` again after waiting but avoid redoing the non-log work).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to