Copilot commented on code in PR #64342:
URL: https://github.com/apache/airflow/pull/64342#discussion_r3025336710
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py:
##########
@@ -87,6 +95,131 @@ def hook(self) -> AwsGenericHook:
config=self.botocore_config,
)
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ if not self.verbose:
+ async for event in super().run():
+ yield event
+ return
+
+ hook = self.hook()
+ async with (
+ await hook.get_async_conn() as glue_client,
+ await AwsLogsHook(
+ aws_conn_id=self.aws_conn_id, region_name=self.region_name
+ ).get_async_conn() as logs_client,
+ ):
+ # Get log group name from job run metadata and initial state in
one call
+ job_run_resp = await
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+ log_group_output, log_group_error =
get_glue_log_group_names(job_run_resp["JobRun"])
+
+ output_token: str | None = None
+ error_token: str | None = None
+ job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+ for _attempt in range(self.attempts):
+ # Fetch and print logs from both output and error streams
+ try:
+ output_token = await self._forward_logs(
+ logs_client, log_group_output, self.run_id,
output_token
+ )
+ error_token = await self._forward_logs(
+ logs_client, log_group_error, self.run_id, error_token
+ )
+ except ClientError as e:
+ self.log.error(
+ "Failed to fetch logs for Glue Job %s Run %s: %s",
+ self.job_name,
+ self.run_id,
+ e,
+ )
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Failed to fetch logs for Glue Job
{self.job_name} Run {self.run_id}: {e}",
+ self.return_key: self.return_value,
+ }
+ )
+ return
+
+ if job_run_state in ("FAILED", "TIMEOUT"):
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Glue Job {self.job_name} Run
{self.run_id}"
+ f" exited with state: {job_run_state}",
+ self.return_key: self.return_value,
+ }
+ )
+ return
+ if job_run_state in ("SUCCEEDED", "STOPPED"):
+ self.log.info(
+ "Exiting Job %s Run %s State: %s",
+ self.job_name,
+ self.run_id,
+ job_run_state,
+ )
+ yield TriggerEvent({"status": "success", self.return_key:
self.return_value})
+ return
+
+ self.log.info(
+ "Polling for AWS Glue Job %s current run state: %s",
+ self.job_name,
+ job_run_state,
+ )
+ await asyncio.sleep(self.waiter_delay)
+
+ # Fetch updated state for next iteration
+ resp = await glue_client.get_job_run(JobName=self.job_name,
RunId=self.run_id)
+ job_run_state = resp["JobRun"]["JobRunState"]
+
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Glue Job {self.job_name} Run {self.run_id}"
+ f" waiter exceeded max attempts ({self.attempts})",
+ self.return_key: self.return_value,
+ }
+ )
+
+ async def _forward_logs(
+ self,
+ logs_client: Any,
+ log_group: str,
+ log_stream: str,
+ next_token: str | None,
+ ) -> str | None:
+ # Matches the format used by the synchronous
GlueJobHook.print_job_logs.
+ fetched_logs: list[str] = []
+ while True:
+ token_arg: dict[str, str] = {"nextToken": next_token} if
next_token else {}
+ try:
+ response = await logs_client.get_log_events(
+ logGroupName=log_group,
+ logStreamName=log_stream,
+ startFromHead=True,
+ **token_arg,
+ )
+ except ClientError as e:
+ if e.response["Error"]["Code"] == "ResourceNotFoundException":
+ self.log.warning(
+ "No new Glue driver logs so far.\n"
+ "If this persists, check the CloudWatch dashboard at:
%r.",
+
f"https://{self.region_name}.console.aws.amazon.com/cloudwatch/home",
+ )
Review Comment:
`ResourceNotFoundException` handling builds the CloudWatch URL using
`self.region_name`, but `region_name` is often `None` (resolved from the AWS
connection instead), producing an invalid
`https://None.console.aws.amazon.com/...` link. Use the resolved region (e.g.,
`logs_client.meta.region_name` / hook `conn_region_name`) when constructing the
URL. Also, the sync path logs both the warning and the standard "No new log
from the Glue Job..." INFO line; here `_forward_logs` returns early and skips
that INFO message, so verbose output differs between sync and deferrable modes.
```suggestion
# Use the resolved region from the logs client (falling
back to self.region_name),
# and still emit the standard "no new logs" INFO line
for consistency with
# the synchronous GlueJobHook behavior.
region = getattr(getattr(logs_client, "meta", None),
"region_name", None) or self.region_name
if region:
cloudwatch_url =
f"https://{region}.console.aws.amazon.com/cloudwatch/home"
else:
cloudwatch_url =
"https://console.aws.amazon.com/cloudwatch/home"
self.log.warning(
"No new Glue driver logs so far.\n"
"If this persists, check the CloudWatch dashboard
at: %r.",
cloudwatch_url,
)
self.log.info(format_glue_logs(fetched_logs, log_group))
```
##########
providers/amazon/tests/unit/amazon/aws/triggers/test_glue.py:
##########
@@ -111,6 +113,238 @@ def test_serialization(self):
"waiter_delay": 10,
}
+ def test_serialization_verbose(self):
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="JobRunId",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_max_attempts=3,
+ waiter_delay=10,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert kwargs["verbose"] is True
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_success(self, mock_glue_conn, mock_logs_conn):
+ """When verbose=True, the trigger polls job state and fetches
CloudWatch logs."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ side_effect=[
+ # First call: metadata + initial state (RUNNING)
+ {"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}},
+ # Second call: state update after sleep (SUCCEEDED)
+ {"JobRun": {"JobRunState": "SUCCEEDED", "LogGroupName":
"/aws-glue/python-jobs"}},
+ ]
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ return_value={
+ "events": [{"timestamp": 1234, "message": "Processing step
1\n"}],
+ "nextForwardToken": "token_1",
+ }
+ )
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ generator = trigger.run()
+ event = await generator.asend(None)
+
+ assert event.payload["status"] == "success"
+ assert event.payload["run_id"] == "jr_123"
+ # Logs client was called for both output and error streams
+ assert logs_client.get_log_events.call_count >= 2
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_job_failed(self, mock_glue_conn,
mock_logs_conn):
+ """When verbose=True and the job fails, the trigger yields an error
event."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ return_value={"JobRun": {"JobRunState": "FAILED", "LogGroupName":
"/aws-glue/python-jobs"}}
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ generator = trigger.run()
+ event = await generator.asend(None)
+ assert event.payload["status"] == "error"
+ assert "FAILED" in event.payload["message"]
+ assert event.payload["run_id"] == "jr_123"
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_max_attempts(self, mock_glue_conn,
mock_logs_conn):
+ """When verbose=True and the job stays RUNNING past max attempts,
yields an error event."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}}
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(return_value={"events": [],
"nextForwardToken": "token_1"})
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=2,
+ )
+ generator = trigger.run()
+ event = await generator.asend(None)
+ assert event.payload["status"] == "error"
+ assert "max attempts" in event.payload["message"]
+ assert event.payload["run_id"] == "jr_123"
+
+ @pytest.mark.asyncio
+ @mock.patch.object(AwsLogsHook, "get_async_conn")
+ @mock.patch.object(GlueJobHook, "get_async_conn")
+ async def test_verbose_run_cloudwatch_client_error(self, mock_glue_conn,
mock_logs_conn):
+ """When verbose=True and CloudWatch returns an unexpected ClientError,
yields error event."""
+ glue_client = AsyncMock()
+ glue_client.get_job_run = AsyncMock(
+ return_value={"JobRun": {"JobRunState": "RUNNING", "LogGroupName":
"/aws-glue/python-jobs"}}
+ )
+ mock_glue_conn.return_value.__aenter__ =
AsyncMock(return_value=glue_client)
+ mock_glue_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ side_effect=ClientError(
+ {"Error": {"Code": "AccessDeniedException", "Message": "not
authorized"}},
+ "GetLogEvents",
+ )
+ )
+ mock_logs_conn.return_value.__aenter__ =
AsyncMock(return_value=logs_client)
+ mock_logs_conn.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ generator = trigger.run()
+ event = await generator.asend(None)
+ assert event.payload["status"] == "error"
+ assert "Failed to fetch logs" in event.payload["message"]
+ assert "AccessDeniedException" in event.payload["message"]
+ assert event.payload["run_id"] == "jr_123"
+
+ @pytest.mark.asyncio
+ async def test_forward_logs_resource_not_found(self):
+ """_forward_logs handles ResourceNotFoundException gracefully."""
+ logs_client = AsyncMock()
+ logs_client.get_log_events = AsyncMock(
+ side_effect=ClientError(
+ {"Error": {"Code": "ResourceNotFoundException", "Message":
"not found"}},
+ "GetLogEvents",
+ )
+ )
+
+ trigger = GlueJobCompleteTrigger(
+ job_name="job_name",
+ run_id="jr_123",
+ verbose=True,
+ aws_conn_id="aws_conn_id",
+ region_name="us-east-1",
+ waiter_delay=0,
+ waiter_max_attempts=5,
+ )
+ result = await trigger._forward_logs(logs_client,
"/aws-glue/python-jobs/output", "jr_123", None)
+ assert result is None
+
+ @pytest.mark.asyncio
+ async def test_forward_logs_pagination(self, caplog):
+ """_forward_logs follows nextForwardToken and formats logs like the
sync path."""
Review Comment:
These caplog-based assertions depend on INFO-level log records from
`_forward_logs()`, but the test never sets a capture level. To avoid
environment-dependent flakiness, set `caplog.set_level("INFO")` (optionally for
the trigger module logger) before calling `_forward_logs()` in this test (and
the no-new-events test below).
```suggestion
"""_forward_logs follows nextForwardToken and formats logs like the
sync path."""
caplog.set_level("INFO")
```
##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/glue.py:
##########
@@ -87,6 +95,131 @@ def hook(self) -> AwsGenericHook:
config=self.botocore_config,
)
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ if not self.verbose:
+ async for event in super().run():
+ yield event
+ return
+
+ hook = self.hook()
+ async with (
+ await hook.get_async_conn() as glue_client,
+ await AwsLogsHook(
+ aws_conn_id=self.aws_conn_id, region_name=self.region_name
+ ).get_async_conn() as logs_client,
+ ):
+ # Get log group name from job run metadata and initial state in
one call
+ job_run_resp = await
glue_client.get_job_run(JobName=self.job_name, RunId=self.run_id)
+ log_group_output, log_group_error =
get_glue_log_group_names(job_run_resp["JobRun"])
+
+ output_token: str | None = None
+ error_token: str | None = None
+ job_run_state = job_run_resp["JobRun"]["JobRunState"]
+
+ for _attempt in range(self.attempts):
+ # Fetch and print logs from both output and error streams
+ try:
+ output_token = await self._forward_logs(
+ logs_client, log_group_output, self.run_id,
output_token
+ )
+ error_token = await self._forward_logs(
+ logs_client, log_group_error, self.run_id, error_token
+ )
+ except ClientError as e:
+ self.log.error(
+ "Failed to fetch logs for Glue Job %s Run %s: %s",
+ self.job_name,
+ self.run_id,
+ e,
+ )
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Failed to fetch logs for Glue Job
{self.job_name} Run {self.run_id}: {e}",
+ self.return_key: self.return_value,
+ }
+ )
+ return
+
+ if job_run_state in ("FAILED", "TIMEOUT"):
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Glue Job {self.job_name} Run
{self.run_id}"
+ f" exited with state: {job_run_state}",
+ self.return_key: self.return_value,
+ }
+ )
+ return
+ if job_run_state in ("SUCCEEDED", "STOPPED"):
+ self.log.info(
+ "Exiting Job %s Run %s State: %s",
+ self.job_name,
+ self.run_id,
+ job_run_state,
+ )
+ yield TriggerEvent({"status": "success", self.return_key:
self.return_value})
+ return
+
+ self.log.info(
+ "Polling for AWS Glue Job %s current run state: %s",
+ self.job_name,
+ job_run_state,
+ )
+ await asyncio.sleep(self.waiter_delay)
+
+ # Fetch updated state for next iteration
+ resp = await glue_client.get_job_run(JobName=self.job_name,
RunId=self.run_id)
+ job_run_state = resp["JobRun"]["JobRunState"]
+
Review Comment:
The polling loop updates `job_run_state` after the final `get_job_run()`
call, but never re-checks that updated state before falling out of the `for`
loop and yielding the "waiter exceeded max attempts" error. This can
incorrectly report an error when the job transitions to SUCCEEDED/FAILED on the
last poll (e.g., `waiter_max_attempts=1`). Restructure the loop to fetch/check
state at the start of each attempt, or perform a final terminal-state check
after the loop before emitting the max-attempts error; add a unit test covering
the `waiter_max_attempts=1` edge case.
```suggestion
# Final check of the last fetched state before emitting
max-attempts error
if job_run_state in ("FAILED", "TIMEOUT"):
yield TriggerEvent(
{
"status": "error",
"message": f"Glue Job {self.job_name} Run
{self.run_id}"
f" exited with state: {job_run_state}",
self.return_key: self.return_value,
}
)
return
if job_run_state in ("SUCCEEDED", "STOPPED"):
self.log.info(
"Exiting Job %s Run %s State: %s",
self.job_name,
self.run_id,
job_run_state,
)
yield TriggerEvent({"status": "success", self.return_key:
self.return_value})
return
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]