Copilot commented on code in PR #64198:
URL: https://github.com/apache/airflow/pull/64198#discussion_r3025334331
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4360,6 +4360,53 @@ class CustomOperator(BaseOperator):
expected_exception_logs.insert(index, calls)
assert log.exception.mock_calls == expected_exception_logs
+ def test_airflow_fail_exception_in_on_retry_callback_marks_task_as_failed(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
+ """AirflowFailException raised in on_retry_callback must prevent
retries and mark task as FAILED."""
+ from airflow.sdk.exceptions import AirflowFailException
+
Review Comment:
`AirflowFailException` is already imported at the top of this test module,
so this additional in-test import is redundant. Removing it will reduce noise
and keep imports consistent within the file.
```suggestion
```
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1538,10 +1538,14 @@ def _run_task_state_change_callbacks(
context: Context,
log: Logger,
) -> None:
+ from airflow.sdk.exceptions import AirflowFailException
+
Review Comment:
`AirflowFailException` is already imported at module scope (and is used in
multiple places in this file), so importing it inside
`_run_task_state_change_callbacks` adds an unnecessary local import and goes
against the project guideline to keep imports at module level unless there’s a
clear reason (circular import / isolation / lazy load). Consider moving this to
the existing module-level `from airflow.sdk.exceptions import (...)` import
list and reusing it here.
```suggestion
```
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -4360,6 +4360,53 @@ class CustomOperator(BaseOperator):
expected_exception_logs.insert(index, calls)
assert log.exception.mock_calls == expected_exception_logs
+ def test_airflow_fail_exception_in_on_retry_callback_marks_task_as_failed(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
+ """AirflowFailException raised in on_retry_callback must prevent
retries and mark task as FAILED."""
+ from airflow.sdk.exceptions import AirflowFailException
+
+ failure_callback_called = []
+ retry_callback_called = []
+
+ def retry_callback(context):
+ retry_callback_called.append(True)
+ raise AirflowFailException("No more retries!")
+
+ def failure_callback(context):
+ failure_callback_called.append(True)
+
+ class FailingOperator(BaseOperator):
+ def execute(self, context):
+ raise AirflowException("Task failed")
+
+ task = FailingOperator(
+ task_id="task",
+ on_retry_callback=retry_callback,
+ on_failure_callback=failure_callback,
+ )
+ runtime_ti = create_runtime_ti(dag_id="dag", task=task,
should_retry=True)
+ log = mock.MagicMock()
Review Comment:
This test introduces an unspecced `mock.MagicMock()` for `log`. The repo’s
testing guidelines prefer mocks with `spec`/`autospec` to avoid silently
accepting invalid attributes (e.g. `mock.MagicMock(spec=FilteringBoundLogger)`
or whichever logger interface `run()` expects).
```suggestion
log = mock.MagicMock(spec=type(runtime_ti.log))
```
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1801,15 +1806,38 @@ def finalize(
except Exception:
log.exception("error calling listener")
elif state == TaskInstanceState.UP_FOR_RETRY:
- _run_task_state_change_callbacks(task, "on_retry_callback", context,
log)
+ from airflow.sdk.exceptions import AirflowFailException
+
+ try:
+ _run_task_state_change_callbacks(task, "on_retry_callback",
context, log)
Review Comment:
Similar to `_run_task_state_change_callbacks`, this local import of
`AirflowFailException` is unnecessary (it’s already available at module scope
in this file) and adds repeated imports in a hot path. Prefer using the
existing module-level import unless there’s a specific circular-import /
isolation reason.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]