This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2b31f373ef Remove remaining Airflow 2.6 backcompat code from Amazon
Provider (#36324)
2b31f373ef is described below
commit 2b31f373ef92c2b793f3f484192aa7b7fc88a7b6
Author: Andrey Anshin <[email protected]>
AuthorDate: Wed Dec 20 14:06:08 2023 +0400
Remove remaining Airflow 2.6 backcompat code from Amazon Provider (#36324)
---
.../providers/amazon/aws/log/s3_task_handler.py | 46 +---------------------
.../amazon/aws/log/test_s3_task_handler.py | 39 ++----------------
2 files changed, 6 insertions(+), 79 deletions(-)
diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py
b/airflow/providers/amazon/aws/log/s3_task_handler.py
index f3664f7c41..96cf54478a 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -24,8 +24,6 @@ import shutil
from functools import cached_property
from typing import TYPE_CHECKING
-from packaging.version import Version
-
from airflow.configuration import conf
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.log.file_task_handler import FileTaskHandler
@@ -35,18 +33,6 @@ if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
-def get_default_delete_local_copy():
- """Load delete_local_logs conf if Airflow version > 2.6 and return False
if not.
-
- TODO: delete this function when min airflow version >= 2.6
- """
- from airflow.version import version
-
- if Version(version) < Version("2.6"):
- return False
- return conf.getboolean("logging", "delete_local_logs")
-
-
class S3TaskHandler(FileTaskHandler, LoggingMixin):
"""
S3TaskHandler is a python log handler that handles and reads task instance
logs.
@@ -66,8 +52,8 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
self._hook = None
self.closed = False
self.upload_on_close = True
- self.delete_local_copy = (
- kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else
get_default_delete_local_copy()
+ self.delete_local_copy = kwargs.get(
+ "delete_local_copy", conf.getboolean("logging",
"delete_local_logs")
)
@cached_property
@@ -145,34 +131,6 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
messages.append(f"No logs found on s3 for ti={ti}")
return messages, logs
- def _read(self, ti, try_number, metadata=None):
- """
- Read logs of given task instance and try_number from S3 remote storage.
-
- If failed, read the log from task instance host machine.
-
- todo: when min airflow version >= 2.6 then remove this method
(``_read``)
-
- :param ti: task instance object
- :param try_number: task instance try_number to read logs from
- :param metadata: log metadata,
- can be used for steaming log reading and auto-tailing.
- """
- # from airflow 2.6 we no longer implement the _read method
- if hasattr(super(), "_read_remote_logs"):
- return super()._read(ti, try_number, metadata)
- # if we get here, we're on airflow < 2.6 and we use this backcompat
logic
- messages, logs = self._read_remote_logs(ti, try_number, metadata)
- if logs:
- return "".join(f"*** {x}\n" for x in messages) + "\n".join(logs),
{"end_of_log": True}
- else:
- if metadata and metadata.get("log_pos", 0) > 0:
- log_prefix = ""
- else:
- log_prefix = "*** Falling back to local log\n"
- local_log, metadata = super()._read(ti, try_number, metadata)
- return f"{log_prefix}{local_log}", metadata
-
def s3_log_exists(self, remote_log_location: str) -> bool:
"""
Check if remote_log_location exists in remote storage.
diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py
b/tests/providers/amazon/aws/log/test_s3_task_handler.py
index 758bf1244a..67d5d3257f 100644
--- a/tests/providers/amazon/aws/log/test_s3_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py
@@ -148,33 +148,6 @@ class TestS3TaskHandler:
assert actual == expected
assert {"end_of_log": True, "log_pos": 0} == metadata[0]
- def test_read_when_s3_log_missing_and_log_pos_missing_pre_26(self):
- ti = copy.copy(self.ti)
- ti.state = TaskInstanceState.SUCCESS
- # mock that super class has no _read_remote_logs method
- with
mock.patch("airflow.providers.amazon.aws.log.s3_task_handler.hasattr",
return_value=False):
- log, metadata = self.s3_task_handler.read(ti)
- assert 1 == len(log)
- assert log[0][0][-1].startswith("*** Falling back to local log")
-
- def test_read_when_s3_log_missing_and_log_pos_zero_pre_26(self):
- ti = copy.copy(self.ti)
- ti.state = TaskInstanceState.SUCCESS
- # mock that super class has no _read_remote_logs method
- with
mock.patch("airflow.providers.amazon.aws.log.s3_task_handler.hasattr",
return_value=False):
- log, metadata = self.s3_task_handler.read(ti, metadata={"log_pos":
0})
- assert 1 == len(log)
- assert log[0][0][-1].startswith("*** Falling back to local log")
-
- def test_read_when_s3_log_missing_and_log_pos_over_zero_pre_26(self):
- ti = copy.copy(self.ti)
- ti.state = TaskInstanceState.SUCCESS
- # mock that super class has no _read_remote_logs method
- with
mock.patch("airflow.providers.amazon.aws.log.s3_task_handler.hasattr",
return_value=False):
- log, metadata = self.s3_task_handler.read(ti, metadata={"log_pos":
1})
- assert 1 == len(log)
- assert not log[0][0][-1].startswith("*** Falling back to local log")
-
def test_s3_read_when_log_missing(self):
handler = self.s3_task_handler
url = "s3://bucket/foo"
@@ -240,15 +213,11 @@ class TestS3TaskHandler:
boto3.resource("s3").Object("bucket", self.remote_log_key).get()
@pytest.mark.parametrize(
- "delete_local_copy, expected_existence_of_local_copy, airflow_version",
- [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True,
"2.5.0"), (False, True, "2.5.0")],
+ "delete_local_copy, expected_existence_of_local_copy",
+ [(True, False), (False, True)],
)
- def test_close_with_delete_local_logs_conf(
- self, delete_local_copy, expected_existence_of_local_copy,
airflow_version
- ):
- with conf_vars({("logging", "delete_local_logs"):
str(delete_local_copy)}), mock.patch(
- "airflow.version.version", airflow_version
- ):
+ def test_close_with_delete_local_logs_conf(self, delete_local_copy,
expected_existence_of_local_copy):
+ with conf_vars({("logging", "delete_local_logs"):
str(delete_local_copy)}):
handler = S3TaskHandler(self.local_log_location,
self.remote_log_base)
handler.log.info("test")