Copilot commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3066475109
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -257,6 +260,9 @@ def execute_complete(self, context: Context, event:
dict[str, Any] | None = None
self.job_id = validated_event["job_id"]
+ # Persist operator links for UI
+ self._persist_links(context)
+
# Fetch logs if awslogs_enabled
if self.awslogs_enabled:
self.monitor_job(context) # fetch logs, no need to return
Review Comment:
`execute_complete()` persists links and then (when `awslogs_enabled`) calls
`monitor_job()`, which immediately calls `_persist_links()` again. This
duplicates XCom writes and AWS lookups. Consider removing one of the two calls
(e.g., let `monitor_job()` handle persistence for the `awslogs_enabled` branch,
or add a parameter/flag to `monitor_job()` to avoid re-persisting when already
done).
```suggestion
# Fetch logs if awslogs_enabled. monitor_job() persists operator
links,
# so avoid duplicating that work here.
if self.awslogs_enabled:
self.monitor_job(context) # fetch logs, no need to return
else:
# Persist operator links for UI when monitor_job() is not called.
self._persist_links(context)
```
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -403,6 +395,50 @@ def monitor_job(self, context: Context):
aws_partition=self.hook.conn_partition,
**awslogs[0],
)
+ else:
+ # Persist placeholder to prevent "XCom not found" warnings
+ # CloudWatch logs will be updated when job completes
+ context["task_instance"].xcom_push(
+ key="cloudwatch_events",
+ value=None,
+ )
+
+ def monitor_job(self, context: Context):
+ """
+ Monitor an AWS Batch job.
+
+ This can raise an exception or an AirflowTaskTimeout if the task was
+ created with ``execution_timeout``.
+ """
+ if not self.job_id:
+ raise AirflowException("AWS Batch job - job_id was not found")
+
+ # Persist operator links
+ self._persist_links(context)
+
+ if self.awslogs_enabled:
+ if self.waiters:
+ self.waiters.wait_for_job(self.job_id,
get_batch_log_fetcher=self._get_batch_log_fetcher)
+ else:
+ self.hook.wait_for_job(self.job_id,
get_batch_log_fetcher=self._get_batch_log_fetcher)
+ else:
+ if self.waiters:
+ self.waiters.wait_for_job(self.job_id)
+ else:
+ self.hook.wait_for_job(self.job_id)
Review Comment:
In the non-deferrable path, `_persist_links()` now runs *before* waiting for
the job. If CloudWatch logs are not available yet, `_persist_links()` pushes a
`None` placeholder and the operator link may never be updated to a real
CloudWatch link (because persistence is not rerun after `wait_for_job`).
Consider persisting job-definition/job-queue links before waiting, then
persisting the CloudWatch log link after `wait_for_job` (or call
`_persist_links()` again after waiting but avoid redoing the non-log work).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]