Copilot commented on code in PR #64745:
URL: https://github.com/apache/airflow/pull/64745#discussion_r3067566426
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -330,18 +337,38 @@ def submit_job(self, context: Context):
job_id=self.job_id,
)
- def monitor_job(self, context: Context):
+ def _persist_links(
+ self, context: Context, skip_cloudwatch: bool = False,
job_description: dict | None = None
+ ) -> dict:
"""
- Monitor an AWS Batch job.
+ Persist operator links for UI display.
- This can raise an exception or an AirflowTaskTimeout if the task was
- created with ``execution_timeout``.
+ This method retrieves job details and persists the operator links
+ (job definition, job queue, CloudWatch logs) as XCom values so they
+ can be rendered in the Airflow UI.
+
+ :param context: Task context
+ :param skip_cloudwatch: If True, skip fetching CloudWatch logs (useful
before deferring)
+ :param job_description: Optional pre-fetched job description to avoid
redundant API calls
+ :return: Job description dict
"""
if not self.job_id:
raise AirflowException("AWS Batch job - job_id was not found")
+ # Fetch job description (needed for return value and link persistence)
+ try:
+ job_desc = job_description or
self.hook.get_job_description(self.job_id)
+ except KeyError:
+ self.log.warning("AWS Batch job (%s) description not available",
self.job_id)
+ return {}
Review Comment:
`BatchClientHook.get_job_description()` raises `AirflowException` on failure
(after retries), not `KeyError`, so this `except KeyError` will never run and
the warning/empty-dict fallback is effectively dead code. Either catch
`AirflowException` here (if you really want to proceed without a description)
or remove the try/except and let the exception propagate.
```suggestion
job_desc = job_description or
self.hook.get_job_description(self.job_id)
```
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -330,18 +337,38 @@ def submit_job(self, context: Context):
job_id=self.job_id,
)
- def monitor_job(self, context: Context):
+ def _persist_links(
+ self, context: Context, skip_cloudwatch: bool = False,
job_description: dict | None = None
+ ) -> dict:
"""
- Monitor an AWS Batch job.
+ Persist operator links for UI display.
- This can raise an exception or an AirflowTaskTimeout if the task was
- created with ``execution_timeout``.
+ This method retrieves job details and persists the operator links
+ (job definition, job queue, CloudWatch logs) as XCom values so they
+ can be rendered in the Airflow UI.
+
+ :param context: Task context
+ :param skip_cloudwatch: If True, skip fetching CloudWatch logs (useful
before deferring)
+ :param job_description: Optional pre-fetched job description to avoid
redundant API calls
+ :return: Job description dict
"""
if not self.job_id:
raise AirflowException("AWS Batch job - job_id was not found")
+ # Fetch job description (needed for return value and link persistence)
+ try:
+ job_desc = job_description or
self.hook.get_job_description(self.job_id)
Review Comment:
`get_job_description()` is called with a positional argument here, but the
surrounding codebase/tests treat this as a keyworded call (`job_id=...`). Using
the keyword avoids brittle mock assertions and is consistent with the hook
signature, especially since unit tests assert
`get_job_description(job_id=JOB_ID)`.
```suggestion
job_desc = job_description or
self.hook.get_job_description(job_id=self.job_id)
```
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -330,18 +337,38 @@ def submit_job(self, context: Context):
job_id=self.job_id,
)
- def monitor_job(self, context: Context):
+ def _persist_links(
+ self, context: Context, skip_cloudwatch: bool = False,
job_description: dict | None = None
+ ) -> dict:
"""
- Monitor an AWS Batch job.
+ Persist operator links for UI display.
- This can raise an exception or an AirflowTaskTimeout if the task was
- created with ``execution_timeout``.
+ This method retrieves job details and persists the operator links
+ (job definition, job queue, CloudWatch logs) as XCom values so they
+ can be rendered in the Airflow UI.
+
+ :param context: Task context
+ :param skip_cloudwatch: If True, skip fetching CloudWatch logs (useful
before deferring)
+ :param job_description: Optional pre-fetched job description to avoid
redundant API calls
+ :return: Job description dict
"""
if not self.job_id:
raise AirflowException("AWS Batch job - job_id was not found")
+ # Fetch job description (needed for return value and link persistence)
+ try:
+ job_desc = job_description or
self.hook.get_job_description(self.job_id)
+ except KeyError:
+ self.log.warning("AWS Batch job (%s) description not available",
self.job_id)
+ return {}
+
+ # Check if we can persist links (requires task_instance in context)
+ task_instance = context.get("task_instance") if context else None
+ if not task_instance:
+ self.log.debug("Skipping link persistence: task_instance not
available in context")
+ return job_desc
+
Review Comment:
The link-persistence guard is checking `context.get("task_instance")`, but
AWS operator links in this codebase persist via `context["ti"].xcom_push` (see
`BaseAwsLink.persist`). This means `_persist_links()` can incorrectly skip
persistence when the context only contains `ti`, and it also changes behavior
for callers that pass `context=None` (existing unit test
`test_monitor_job_with_logs` calls `monitor_job(context=None)` and expects the
link `persist()` methods to be invoked). Consider removing this guard entirely
and letting `BaseAwsLink.persist` no-op when context/ti is missing, or at least
key off `context.get("ti")` instead of `task_instance`.
```suggestion
```
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -257,9 +259,14 @@ def execute_complete(self, context: Context, event:
dict[str, Any] | None = None
self.job_id = validated_event["job_id"]
- # Fetch logs if awslogs_enabled
+ # Fetch logs and persist links if awslogs_enabled, otherwise just
persist links
if self.awslogs_enabled:
- self.monitor_job(context) # fetch logs, no need to return
+ # monitor_job() handles link persistence, log fetching, and
success check
+ self.monitor_job(context)
+ else:
+ # Persist operator links and check job success
+ self._persist_links(context)
+ self.hook.check_job_success(self.job_id)
Review Comment:
In `execute_complete()` when `awslogs_enabled` is False, calling
`_persist_links(context)` introduces new AWS API calls (e.g. `describe_jobs`,
and potentially CloudWatch log parsing) during completion even though the links
were already persisted before deferral in `execute()` (and there’s no log link
to fetch in this branch). This also breaks existing unit tests that call
`execute_complete(context={...})` without stubbing the hook. Consider skipping
`_persist_links()` entirely in this branch (just `check_job_success`), or at
least guard it behind `context.get("ti")`/`do_xcom_push` and pass
`skip_cloudwatch=True` to avoid extra log-related calls/warnings.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]