This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cb97db8931 [OSSTaskHandler, CloudwatchTaskHandler, S3TaskHandler, 
HdfsTaskHandler, ElasticsearchTaskHandler, GCSTaskHandler, 
OpensearchTaskHandler, RedisTaskHandler, WasbTaskHandler] supports log file 
size handling (#55455)
0cb97db8931 is described below

commit 0cb97db89311ec82435b83a5c248d99c6051ea65
Author: AutomationDev85 <[email protected]>
AuthorDate: Sun Sep 14 15:15:33 2025 +0200

    [OSSTaskHandler, CloudwatchTaskHandler, S3TaskHandler, HdfsTaskHandler, 
ElasticsearchTaskHandler, GCSTaskHandler, OpensearchTaskHandler, 
RedisTaskHandler, WasbTaskHandler] supports log file size handling (#55455)
    
    * WasbTaskHandler supports log file size handling
    
    * OSSTaskHandler, CloudwatchTaskHandler, S3TaskHandler, HdfsTaskHandler, 
ElasticsearchTaskHandler, GCSTaskHandler, OpensearchTaskHandler, 
RedisTaskHandler support log file size handling
    
    * Remove space
    
    ---------
    
    Co-authored-by: AutomationDev85 <AutomationDev85>
---
 .../providers/alibaba/cloud/log/oss_task_handler.py       | 15 +++++++++++++--
 .../providers/amazon/aws/log/cloudwatch_task_handler.py   | 15 +++++++++++++--
 .../airflow/providers/amazon/aws/log/s3_task_handler.py   | 15 +++++++++++++--
 .../providers/apache/hdfs/log/hdfs_task_handler.py        | 15 +++++++++++++--
 .../providers/elasticsearch/log/es_task_handler.py        | 10 ++++++++--
 .../providers/google/cloud/log/gcs_task_handler.py        | 10 ++++++++--
 .../providers/microsoft/azure/log/wasb_task_handler.py    |  8 +++++++-
 .../airflow/providers/opensearch/log/os_task_handler.py   | 10 ++++++++--
 .../src/airflow/providers/redis/log/redis_task_handler.py | 10 ++++++++--
 9 files changed, 91 insertions(+), 17 deletions(-)

diff --git 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index 949556a04c2..e69df540665 100644
--- 
a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++ 
b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -163,9 +163,20 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
     Extends airflow FileTaskHandler and uploads to and reads from OSS remote 
storage.
     """
 
-    def __init__(self, base_log_folder, oss_log_folder, **kwargs):
+    def __init__(
+        self,
+        base_log_folder: str,
+        oss_log_folder: str,
+        max_bytes: int = 0,
+        backup_count: int = 0,
+        delay: bool = False,
+        **kwargs,
+    ) -> None:
         self.log.info("Using oss_task_handler for remote logging...")
-        super().__init__(base_log_folder)
+        # support log file size handling of FileTaskHandler
+        super().__init__(
+            base_log_folder=base_log_folder, max_bytes=max_bytes, 
backup_count=backup_count, delay=delay
+        )
         self.log_relative_path = ""
         self._hook = None
         self.closed = False
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index 543b0d33cea..79d56e7b6ad 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -230,8 +230,19 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
 
     trigger_should_wrap = True
 
-    def __init__(self, base_log_folder: str, log_group_arn: str, **kwargs):
-        super().__init__(base_log_folder)
+    def __init__(
+        self,
+        base_log_folder: str,
+        log_group_arn: str,
+        max_bytes: int = 0,
+        backup_count: int = 0,
+        delay: bool = False,
+        **kwargs,
+    ) -> None:
+        # support log file size handling of FileTaskHandler
+        super().__init__(
+            base_log_folder=base_log_folder, max_bytes=max_bytes, 
backup_count=backup_count, delay=delay
+        )
         split_arn = log_group_arn.split(":")
 
         self.handler = None
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py 
b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
index 24088ca0d9c..caf5fae0962 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -172,8 +172,19 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
     It extends airflow FileTaskHandler and uploads to and reads from S3 remote 
storage.
     """
 
-    def __init__(self, base_log_folder: str, s3_log_folder: str, **kwargs):
-        super().__init__(base_log_folder)
+    def __init__(
+        self,
+        base_log_folder: str,
+        s3_log_folder: str,
+        max_bytes: int = 0,
+        backup_count: int = 0,
+        delay: bool = False,
+        **kwargs,
+    ) -> None:
+        # support log file size handling of FileTaskHandler
+        super().__init__(
+            base_log_folder=base_log_folder, max_bytes=max_bytes, 
backup_count=backup_count, delay=delay
+        )
         self.handler: logging.FileHandler | None = None
         self.remote_base = s3_log_folder
         self.log_relative_path = ""
diff --git 
a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
 
b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
index b0b89aad6c7..ed76365ac45 100644
--- 
a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
+++ 
b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
@@ -84,8 +84,19 @@ class HdfsTaskHandler(FileTaskHandler, LoggingMixin):
     It extends airflow FileTaskHandler and uploads to and reads from HDFS.
     """
 
-    def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):
-        super().__init__(base_log_folder)
+    def __init__(
+        self,
+        base_log_folder: str,
+        hdfs_log_folder: str,
+        max_bytes: int = 0,
+        backup_count: int = 0,
+        delay: bool = False,
+        **kwargs,
+    ) -> None:
+        # support log file size handling of FileTaskHandler
+        super().__init__(
+            base_log_folder=base_log_folder, max_bytes=max_bytes, 
backup_count=backup_count, delay=delay
+        )
         self.handler: logging.FileHandler | None = None
         self.remote_base = urlsplit(hdfs_log_folder).path
         self.log_relative_path = ""
diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 93532b4efda..cb16583ab61 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -162,13 +162,19 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         index_patterns: str = conf.get("elasticsearch", "index_patterns"),
         index_patterns_callable: str = conf.get("elasticsearch", 
"index_patterns_callable", fallback=""),
         es_kwargs: dict | None | Literal["default_es_kwargs"] = 
"default_es_kwargs",
+        max_bytes: int = 0,
+        backup_count: int = 0,
+        delay: bool = False,
         **kwargs,
-    ):
+    ) -> None:
         es_kwargs = es_kwargs or {}
         if es_kwargs == "default_es_kwargs":
             es_kwargs = get_es_kwargs_from_config()
         self.host = self.format_url(host)
-        super().__init__(base_log_folder)
+        # support log file size handling of FileTaskHandler
+        super().__init__(
+            base_log_folder=base_log_folder, max_bytes=max_bytes, 
backup_count=backup_count, delay=delay
+        )
         self.closed = False
 
         self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
diff --git 
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py 
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
index 6c30f78a6a8..5ad1e0c6d86 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -213,9 +213,15 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         gcp_keyfile_dict: dict | None = None,
         gcp_scopes: Collection[str] | None = _DEFAULT_SCOPESS,
         project_id: str = PROVIDE_PROJECT_ID,
+        max_bytes: int = 0,
+        backup_count: int = 0,
+        delay: bool = False,
         **kwargs,
-    ):
-        super().__init__(base_log_folder)
+    ) -> None:
+        # support log file size handling of FileTaskHandler
+        super().__init__(
+            base_log_folder=base_log_folder, max_bytes=max_bytes, 
backup_count=backup_count, delay=delay
+        )
         self.handler: logging.FileHandler | None = None
         self.log_relative_path = ""
         self.closed = False
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index 017cdae94a8..a51625eb039 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -188,9 +188,15 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
         base_log_folder: str,
         wasb_log_folder: str,
         wasb_container: str,
+        max_bytes: int = 0,
+        backup_count: int = 0,
+        delay: bool = False,
         **kwargs,
     ) -> None:
-        super().__init__(base_log_folder)
+        # support log file size handling of FileTaskHandler
+        super().__init__(
+            base_log_folder=base_log_folder, max_bytes=max_bytes, 
backup_count=backup_count, delay=delay
+        )
         self.handler: logging.FileHandler | None = None
         self.log_relative_path = ""
         self.closed = False
diff --git 
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py 
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
index 0c5d8f6c9f3..b3d4f98507e 100644
--- 
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
+++ 
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
@@ -165,11 +165,17 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
         index_patterns: str = conf.get("opensearch", "index_patterns", 
fallback="_all"),
         index_patterns_callable: str = conf.get("opensearch", 
"index_patterns_callable", fallback=""),
         os_kwargs: dict | None | Literal["default_os_kwargs"] = 
"default_os_kwargs",
-    ):
+        max_bytes: int = 0,
+        backup_count: int = 0,
+        delay: bool = False,
+    ) -> None:
         os_kwargs = os_kwargs or {}
         if os_kwargs == "default_os_kwargs":
             os_kwargs = get_os_kwargs_from_config()
-        super().__init__(base_log_folder)
+        # support log file size handling of FileTaskHandler
+        super().__init__(
+            base_log_folder=base_log_folder, max_bytes=max_bytes, 
backup_count=backup_count, delay=delay
+        )
         self.closed = False
         self.mark_end_on_close = True
         self.end_of_log_mark = end_of_log_mark.strip()
diff --git 
a/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py 
b/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py
index bdc5b6ed0f8..86419f2142e 100644
--- a/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py
+++ b/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py
@@ -61,8 +61,14 @@ class RedisTaskHandler(FileTaskHandler, LoggingMixin):
         max_lines: int = 10000,
         ttl_seconds: int = 60 * 60 * 24 * 28,
         conn_id: str | None = None,
-    ):
-        super().__init__(base_log_folder)
+        max_bytes: int = 0,
+        backup_count: int = 0,
+        delay: bool = False,
+    ) -> None:
+        # support log file size handling of FileTaskHandler
+        super().__init__(
+            base_log_folder=base_log_folder, max_bytes=max_bytes, 
backup_count=backup_count, delay=delay
+        )
         self.handler: _RedisHandler | None = None
         self.max_lines = max_lines
         self.ttl_seconds = ttl_seconds

Reply via email to