Copilot commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3067566426


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -330,18 +337,38 @@ def submit_job(self, context: Context):
             job_id=self.job_id,
         )
 
-    def monitor_job(self, context: Context):
+    def _persist_links(
+        self, context: Context, skip_cloudwatch: bool = False, 
job_description: dict | None = None
+    ) -> dict:
         """
-        Monitor an AWS Batch job.
+        Persist operator links for UI display.
 
-        This can raise an exception or an AirflowTaskTimeout if the task was
-        created with ``execution_timeout``.
+        This method retrieves job details and persists the operator links
+        (job definition, job queue, CloudWatch logs) as XCom values so they
+        can be rendered in the Airflow UI.
+
+        :param context: Task context
+        :param skip_cloudwatch: If True, skip fetching CloudWatch logs (useful 
before deferring)
+        :param job_description: Optional pre-fetched job description to avoid 
redundant API calls
+        :return: Job description dict
         """
         if not self.job_id:
             raise AirflowException("AWS Batch job - job_id was not found")
 
+        # Fetch job description (needed for return value and link persistence)
+        try:
+            job_desc = job_description or 
self.hook.get_job_description(self.job_id)
+        except KeyError:
+            self.log.warning("AWS Batch job (%s) description not available", 
self.job_id)
+            return {}

Review Comment:
   `BatchClientHook.get_job_description()` raises `AirflowException` on failure 
(after retries), not `KeyError`, so this `except KeyError` will never run and 
the warning/empty-dict fallback is effectively dead code. Either catch 
`AirflowException` here (if you really want to proceed without a description) 
or remove the try/except and let the exception propagate.
   ```suggestion
           job_desc = job_description or 
self.hook.get_job_description(self.job_id)
   ```



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -330,18 +337,38 @@ def submit_job(self, context: Context):
             job_id=self.job_id,
         )
 
-    def monitor_job(self, context: Context):
+    def _persist_links(
+        self, context: Context, skip_cloudwatch: bool = False, 
job_description: dict | None = None
+    ) -> dict:
         """
-        Monitor an AWS Batch job.
+        Persist operator links for UI display.
 
-        This can raise an exception or an AirflowTaskTimeout if the task was
-        created with ``execution_timeout``.
+        This method retrieves job details and persists the operator links
+        (job definition, job queue, CloudWatch logs) as XCom values so they
+        can be rendered in the Airflow UI.
+
+        :param context: Task context
+        :param skip_cloudwatch: If True, skip fetching CloudWatch logs (useful 
before deferring)
+        :param job_description: Optional pre-fetched job description to avoid 
redundant API calls
+        :return: Job description dict
         """
         if not self.job_id:
             raise AirflowException("AWS Batch job - job_id was not found")
 
+        # Fetch job description (needed for return value and link persistence)
+        try:
+            job_desc = job_description or 
self.hook.get_job_description(self.job_id)

Review Comment:
   `get_job_description()` is called with a positional argument here, but the 
surrounding codebase/tests treat this as a keyworded call (`job_id=...`). Using 
the keyword avoids brittle mock assertions and is consistent with the hook 
signature, especially since unit tests assert 
`get_job_description(job_id=JOB_ID)`.
   ```suggestion
               job_desc = job_description or 
self.hook.get_job_description(job_id=self.job_id)
   ```



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -330,18 +337,38 @@ def submit_job(self, context: Context):
             job_id=self.job_id,
         )
 
-    def monitor_job(self, context: Context):
+    def _persist_links(
+        self, context: Context, skip_cloudwatch: bool = False, 
job_description: dict | None = None
+    ) -> dict:
         """
-        Monitor an AWS Batch job.
+        Persist operator links for UI display.
 
-        This can raise an exception or an AirflowTaskTimeout if the task was
-        created with ``execution_timeout``.
+        This method retrieves job details and persists the operator links
+        (job definition, job queue, CloudWatch logs) as XCom values so they
+        can be rendered in the Airflow UI.
+
+        :param context: Task context
+        :param skip_cloudwatch: If True, skip fetching CloudWatch logs (useful 
before deferring)
+        :param job_description: Optional pre-fetched job description to avoid 
redundant API calls
+        :return: Job description dict
         """
         if not self.job_id:
             raise AirflowException("AWS Batch job - job_id was not found")
 
+        # Fetch job description (needed for return value and link persistence)
+        try:
+            job_desc = job_description or 
self.hook.get_job_description(self.job_id)
+        except KeyError:
+            self.log.warning("AWS Batch job (%s) description not available", 
self.job_id)
+            return {}
+
+        # Check if we can persist links (requires task_instance in context)
+        task_instance = context.get("task_instance") if context else None
+        if not task_instance:
+            self.log.debug("Skipping link persistence: task_instance not 
available in context")
+            return job_desc
+

Review Comment:
   The link-persistence guard is checking `context.get("task_instance")`, but 
AWS operator links in this codebase persist via `context["ti"].xcom_push` (see 
`BaseAwsLink.persist`). This means `_persist_links()` can incorrectly skip 
persistence when the context only contains `ti`, and it also changes behavior 
for callers that pass `context=None` (existing unit test 
`test_monitor_job_with_logs` calls `monitor_job(context=None)` and expects the 
link `persist()` methods to be invoked). Consider removing this guard entirely 
and letting `BaseAwsLink.persist` no-op when context/ti is missing, or at least 
key off `context.get("ti")` instead of `task_instance`.
   ```suggestion
   
   ```



##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -257,9 +259,14 @@ def execute_complete(self, context: Context, event: 
dict[str, Any] | None = None
 
         self.job_id = validated_event["job_id"]
 
-        # Fetch logs if awslogs_enabled
+        # Fetch logs and persist links if awslogs_enabled, otherwise just 
persist links
         if self.awslogs_enabled:
-            self.monitor_job(context)  # fetch logs, no need to return
+            # monitor_job() handles link persistence, log fetching, and 
success check
+            self.monitor_job(context)
+        else:
+            # Persist operator links and check job success
+            self._persist_links(context)
+            self.hook.check_job_success(self.job_id)

Review Comment:
   In `execute_complete()` when `awslogs_enabled` is False, calling 
`_persist_links(context)` introduces new AWS API calls (e.g. `describe_jobs`, 
and potentially CloudWatch log parsing) during completion even though the links 
were already persisted before deferral in `execute()` (and there’s no log link 
to fetch in this branch). This also breaks existing unit tests that call 
`execute_complete(context={...})` without stubbing the hook. Consider skipping 
`_persist_links()` entirely in this branch (just `check_job_success`), or at 
least guard it behind `context.get("ti")`/`do_xcom_push` and pass 
`skip_cloudwatch=True` to avoid extra log-related calls/warnings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to