This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 672a39efa1feb49336209e00ff2a7e64b0bd70fa Author: Sebastián Ortega <[email protected]> AuthorDate: Tue Sep 16 11:03:07 2025 +0200 Fix error when retrieving logs of ti not run because of upstream failures (#55517) * Fix TestFileTaskLogHandler tests * Return placeholder message when requested the logs of tasks skipped because of upstream failures (cherry picked from commit b069f1fdbf833488a8da885d62734258fc3a73eb) --- .../src/airflow/utils/log/file_task_handler.py | 5 ++++- airflow-core/tests/unit/utils/test_log_handlers.py | 25 +++++++++++----------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 3669aae122b..9954688a41f 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -741,7 +741,10 @@ class FileTaskHandler(logging.Handler): if try_number is None: try_number = task_instance.try_number - if try_number == 0 and task_instance.state == TaskInstanceState.SKIPPED: + if try_number == 0 and task_instance.state in ( + TaskInstanceState.SKIPPED, + TaskInstanceState.UPSTREAM_FAILED, + ): logs = [StructuredLogMessage(event="Task was skipped, no logs available.")] return chain(logs), {"end_of_log": True} diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index b5d939d52e7..6d2caaa8a43 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -79,7 +79,7 @@ from tests_common.test_utils.file_task_handler import ( ) from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker -pytestmark = [pytest.mark.db_test, pytest.mark.xfail()] +pytestmark = [pytest.mark.db_test] DEFAULT_DATE = pendulum.datetime(2016, 1, 1) TASK_LOGGER = "airflow.task" @@ -156,7 +156,8 @@ class TestFileTaskLogHandler: # Remove the generated tmp log file. os.remove(log_filename) - def test_file_task_handler_when_ti_is_skipped(self, dag_maker): + @pytest.mark.parametrize("ti_state", [TaskInstanceState.SKIPPED, TaskInstanceState.UPSTREAM_FAILED]) + def test_file_task_handler_when_ti_is_not_run(self, dag_maker, ti_state): def task_callable(ti): ti.log.info("test") @@ -170,10 +171,10 @@ class TestFileTaskLogHandler: ti = TaskInstance(task=task, run_id=dagrun.run_id, dag_version_id=dag_version.id) ti.try_number = 0 - ti.state = State.SKIPPED + ti.state = ti_state - logger = ti.log - ti.log.disabled = False + logger = logging.getLogger(TASK_LOGGER) + logger.disabled = False file_handler = next( (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None @@ -295,8 +296,8 @@ class TestFileTaskLogHandler: ti.executor = executor_name ti.try_number = 1 ti.state = TaskInstanceState.RUNNING - logger = ti.log - ti.log.disabled = False + logger = logging.getLogger(TASK_LOGGER) + logger.disabled = False file_handler = next( (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None @@ -344,8 +345,8 @@ class TestFileTaskLogHandler: ti.try_number = 2 ti.state = State.RUNNING - logger = ti.log - ti.log.disabled = False + logger = logging.getLogger(TASK_LOGGER) + logger.disabled = False file_handler = next( (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None @@ -396,8 +397,8 @@ class TestFileTaskLogHandler: ti.try_number = 1 ti.state = State.RUNNING - logger = ti.log - ti.log.disabled = False + logger = logging.getLogger(TASK_LOGGER) + logger.disabled = False file_handler = next( (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None @@ -413,7 +414,7 @@ class TestFileTaskLogHandler: assert log_filename.endswith("1.log"), log_filename # mock to generate 2000 lines of log, the total size is larger than max_bytes_size - for i in range(1, 2000): + for i in range(1, 3000): logger.info("this is a Test. %s", i) # this is the rotate log file
