This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0cb97db8931 [OSSTaskHandler, CloudwatchTaskHandler, S3TaskHandler,
HdfsTaskHandler, ElasticsearchTaskHandler, GCSTaskHandler,
OpensearchTaskHandler, RedisTaskHandler, WasbTaskHandler] supports log file
size handling (#55455)
0cb97db8931 is described below
commit 0cb97db89311ec82435b83a5c248d99c6051ea65
Author: AutomationDev85 <[email protected]>
AuthorDate: Sun Sep 14 15:15:33 2025 +0200
[OSSTaskHandler, CloudwatchTaskHandler, S3TaskHandler, HdfsTaskHandler,
ElasticsearchTaskHandler, GCSTaskHandler, OpensearchTaskHandler,
RedisTaskHandler, WasbTaskHandler] supports log file size handling (#55455)
* WasbTaskHandler supports log file size handling
* OSSTaskHandler, CloudwatchTaskHandler, S3TaskHandler, HdfsTaskHandler,
ElasticsearchTaskHandler, GCSTaskHandler, OpensearchTaskHandler,
RedisTaskHandler support log file size handling
* Remove space
---------
Co-authored-by: AutomationDev85 <AutomationDev85>
---
.../providers/alibaba/cloud/log/oss_task_handler.py | 15 +++++++++++++--
.../providers/amazon/aws/log/cloudwatch_task_handler.py | 15 +++++++++++++--
.../airflow/providers/amazon/aws/log/s3_task_handler.py | 15 +++++++++++++--
.../providers/apache/hdfs/log/hdfs_task_handler.py | 15 +++++++++++++--
.../providers/elasticsearch/log/es_task_handler.py | 10 ++++++++--
.../providers/google/cloud/log/gcs_task_handler.py | 10 ++++++++--
.../providers/microsoft/azure/log/wasb_task_handler.py | 8 +++++++-
.../airflow/providers/opensearch/log/os_task_handler.py | 10 ++++++++--
.../src/airflow/providers/redis/log/redis_task_handler.py | 10 ++++++++--
9 files changed, 91 insertions(+), 17 deletions(-)
diff --git
a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index 949556a04c2..e69df540665 100644
---
a/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++
b/providers/alibaba/src/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -163,9 +163,20 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
Extends airflow FileTaskHandler and uploads to and reads from OSS remote
storage.
"""
- def __init__(self, base_log_folder, oss_log_folder, **kwargs):
+ def __init__(
+ self,
+ base_log_folder: str,
+ oss_log_folder: str,
+ max_bytes: int = 0,
+ backup_count: int = 0,
+ delay: bool = False,
+ **kwargs,
+ ) -> None:
self.log.info("Using oss_task_handler for remote logging...")
- super().__init__(base_log_folder)
+ # support log file size handling of FileTaskHandler
+ super().__init__(
+ base_log_folder=base_log_folder, max_bytes=max_bytes,
backup_count=backup_count, delay=delay
+ )
self.log_relative_path = ""
self._hook = None
self.closed = False
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index 543b0d33cea..79d56e7b6ad 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -230,8 +230,19 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
trigger_should_wrap = True
- def __init__(self, base_log_folder: str, log_group_arn: str, **kwargs):
- super().__init__(base_log_folder)
+ def __init__(
+ self,
+ base_log_folder: str,
+ log_group_arn: str,
+ max_bytes: int = 0,
+ backup_count: int = 0,
+ delay: bool = False,
+ **kwargs,
+ ) -> None:
+ # support log file size handling of FileTaskHandler
+ super().__init__(
+ base_log_folder=base_log_folder, max_bytes=max_bytes,
backup_count=backup_count, delay=delay
+ )
split_arn = log_group_arn.split(":")
self.handler = None
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
index 24088ca0d9c..caf5fae0962 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -172,8 +172,19 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
It extends airflow FileTaskHandler and uploads to and reads from S3 remote
storage.
"""
- def __init__(self, base_log_folder: str, s3_log_folder: str, **kwargs):
- super().__init__(base_log_folder)
+ def __init__(
+ self,
+ base_log_folder: str,
+ s3_log_folder: str,
+ max_bytes: int = 0,
+ backup_count: int = 0,
+ delay: bool = False,
+ **kwargs,
+ ) -> None:
+ # support log file size handling of FileTaskHandler
+ super().__init__(
+ base_log_folder=base_log_folder, max_bytes=max_bytes,
backup_count=backup_count, delay=delay
+ )
self.handler: logging.FileHandler | None = None
self.remote_base = s3_log_folder
self.log_relative_path = ""
diff --git
a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
index b0b89aad6c7..ed76365ac45 100644
---
a/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
+++
b/providers/apache/hdfs/src/airflow/providers/apache/hdfs/log/hdfs_task_handler.py
@@ -84,8 +84,19 @@ class HdfsTaskHandler(FileTaskHandler, LoggingMixin):
It extends airflow FileTaskHandler and uploads to and reads from HDFS.
"""
- def __init__(self, base_log_folder: str, hdfs_log_folder: str, **kwargs):
- super().__init__(base_log_folder)
+ def __init__(
+ self,
+ base_log_folder: str,
+ hdfs_log_folder: str,
+ max_bytes: int = 0,
+ backup_count: int = 0,
+ delay: bool = False,
+ **kwargs,
+ ) -> None:
+ # support log file size handling of FileTaskHandler
+ super().__init__(
+ base_log_folder=base_log_folder, max_bytes=max_bytes,
backup_count=backup_count, delay=delay
+ )
self.handler: logging.FileHandler | None = None
self.remote_base = urlsplit(hdfs_log_folder).path
self.log_relative_path = ""
diff --git
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 93532b4efda..cb16583ab61 100644
---
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -162,13 +162,19 @@ class ElasticsearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMix
index_patterns: str = conf.get("elasticsearch", "index_patterns"),
index_patterns_callable: str = conf.get("elasticsearch",
"index_patterns_callable", fallback=""),
es_kwargs: dict | None | Literal["default_es_kwargs"] =
"default_es_kwargs",
+ max_bytes: int = 0,
+ backup_count: int = 0,
+ delay: bool = False,
**kwargs,
- ):
+ ) -> None:
es_kwargs = es_kwargs or {}
if es_kwargs == "default_es_kwargs":
es_kwargs = get_es_kwargs_from_config()
self.host = self.format_url(host)
- super().__init__(base_log_folder)
+ # support log file size handling of FileTaskHandler
+ super().__init__(
+ base_log_folder=base_log_folder, max_bytes=max_bytes,
backup_count=backup_count, delay=delay
+ )
self.closed = False
self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
diff --git
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
index 6c30f78a6a8..5ad1e0c6d86 100644
---
a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
+++
b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -213,9 +213,15 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
gcp_keyfile_dict: dict | None = None,
gcp_scopes: Collection[str] | None = _DEFAULT_SCOPESS,
project_id: str = PROVIDE_PROJECT_ID,
+ max_bytes: int = 0,
+ backup_count: int = 0,
+ delay: bool = False,
**kwargs,
- ):
- super().__init__(base_log_folder)
+ ) -> None:
+ # support log file size handling of FileTaskHandler
+ super().__init__(
+ base_log_folder=base_log_folder, max_bytes=max_bytes,
backup_count=backup_count, delay=delay
+ )
self.handler: logging.FileHandler | None = None
self.log_relative_path = ""
self.closed = False
diff --git
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index 017cdae94a8..a51625eb039 100644
---
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -188,9 +188,15 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
base_log_folder: str,
wasb_log_folder: str,
wasb_container: str,
+ max_bytes: int = 0,
+ backup_count: int = 0,
+ delay: bool = False,
**kwargs,
) -> None:
- super().__init__(base_log_folder)
+ # support log file size handling of FileTaskHandler
+ super().__init__(
+ base_log_folder=base_log_folder, max_bytes=max_bytes,
backup_count=backup_count, delay=delay
+ )
self.handler: logging.FileHandler | None = None
self.log_relative_path = ""
self.closed = False
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
index 0c5d8f6c9f3..b3d4f98507e 100644
---
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
+++
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
@@ -165,11 +165,17 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
index_patterns: str = conf.get("opensearch", "index_patterns",
fallback="_all"),
index_patterns_callable: str = conf.get("opensearch",
"index_patterns_callable", fallback=""),
os_kwargs: dict | None | Literal["default_os_kwargs"] =
"default_os_kwargs",
- ):
+ max_bytes: int = 0,
+ backup_count: int = 0,
+ delay: bool = False,
+ ) -> None:
os_kwargs = os_kwargs or {}
if os_kwargs == "default_os_kwargs":
os_kwargs = get_os_kwargs_from_config()
- super().__init__(base_log_folder)
+ # support log file size handling of FileTaskHandler
+ super().__init__(
+ base_log_folder=base_log_folder, max_bytes=max_bytes,
backup_count=backup_count, delay=delay
+ )
self.closed = False
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark.strip()
diff --git
a/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py
b/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py
index bdc5b6ed0f8..86419f2142e 100644
--- a/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py
+++ b/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py
@@ -61,8 +61,14 @@ class RedisTaskHandler(FileTaskHandler, LoggingMixin):
max_lines: int = 10000,
ttl_seconds: int = 60 * 60 * 24 * 28,
conn_id: str | None = None,
- ):
- super().__init__(base_log_folder)
+ max_bytes: int = 0,
+ backup_count: int = 0,
+ delay: bool = False,
+ ) -> None:
+ # support log file size handling of FileTaskHandler
+ super().__init__(
+ base_log_folder=base_log_folder, max_bytes=max_bytes,
backup_count=backup_count, delay=delay
+ )
self.handler: _RedisHandler | None = None
self.max_lines = max_lines
self.ttl_seconds = ttl_seconds