This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b0e876b470 feat: add `write_to_os` writing task logs to opensearch 
(#64364)
4b0e876b470 is described below

commit 4b0e876b470fcf47933f26898db2bf554c832ab1
Author: Owen Leung <[email protected]>
AuthorDate: Tue Apr 7 21:15:02 2026 +0800

    feat: add `write_to_os` writing task logs to opensearch (#64364)
    
    * feat: restore opensearch provider & add write_to_os feature
    
    * test: fix failing CI
    
    * test: revert test info command
    
    * revise for copilot comments
    
    * revert core changes
    
    * fix breeze unit test
    
    * Update providers/opensearch/provider.yaml
    
    * Update providers/opensearch/provider.yaml
    
    * update pyproejct toml
    
    * fix failing ci
    
    ---------
    
    Co-authored-by: Elad Kalif <[email protected]>
---
 dev/breeze/src/airflow_breeze/global_constants.py  |   2 +-
 .../tests/test_pytest_args_for_test_types.py       |   1 +
 providers/opensearch/docs/index.rst                |   4 +-
 providers/opensearch/docs/logging/index.rst        |  22 +
 providers/opensearch/provider.yaml                 |  14 +
 providers/opensearch/pyproject.toml                |   2 +-
 .../src/airflow/providers/opensearch/__init__.py   |   4 +-
 .../providers/opensearch/get_provider_info.py      |  14 +
 .../providers/opensearch/log/os_task_handler.py    | 399 +++++++--
 .../airflow/providers/opensearch/version_compat.py |   2 +
 .../opensearch => tests/integration}/__init__.py   |  24 +-
 .../integration}/opensearch/__init__.py            |  23 -
 .../integration/opensearch/log}/__init__.py        |  23 -
 .../opensearch/log/test_os_remote_log_io.py        | 140 ++++
 .../opensearch/tests/unit/opensearch/conftest.py   |  49 --
 .../unit/opensearch/log/test_os_task_handler.py    | 913 +++++++++++----------
 16 files changed, 1011 insertions(+), 625 deletions(-)

diff --git a/dev/breeze/src/airflow_breeze/global_constants.py 
b/dev/breeze/src/airflow_breeze/global_constants.py
index efba238c4de..d7af8a84b14 100644
--- a/dev/breeze/src/airflow_breeze/global_constants.py
+++ b/dev/breeze/src/airflow_breeze/global_constants.py
@@ -767,7 +767,7 @@ PROVIDERS_COMPATIBILITY_TESTS_MATRIX: list[dict[str, str | 
list[str]]] = [
     {
         "python-version": "3.10",
         "airflow-version": "2.11.1",
-        "remove-providers": "common.messaging edge3 fab git keycloak 
informatica common.ai",
+        "remove-providers": "common.messaging edge3 fab git keycloak 
informatica common.ai opensearch",
         "run-unit-tests": "true",
     },
     {
diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py 
b/dev/breeze/tests/test_pytest_args_for_test_types.py
index 91a2a54b328..423c96eb10f 100644
--- a/dev/breeze/tests/test_pytest_args_for_test_types.py
+++ b/dev/breeze/tests/test_pytest_args_for_test_types.py
@@ -70,6 +70,7 @@ def _find_all_integration_folders() -> list[str]:
                 "providers/microsoft/mssql/tests/integration",
                 "providers/mongo/tests/integration",
                 "providers/openlineage/tests/integration",
+                "providers/opensearch/tests/integration",
                 "providers/qdrant/tests/integration",
                 "providers/redis/tests/integration",
                 "providers/trino/tests/integration",
diff --git a/providers/opensearch/docs/index.rst 
b/providers/opensearch/docs/index.rst
index ad55afc5f52..c339ac6c296 100644
--- a/providers/opensearch/docs/index.rst
+++ b/providers/opensearch/docs/index.rst
@@ -96,12 +96,12 @@ For the minimum Airflow version supported, see 
``Requirements`` below.
 Requirements
 ------------
 
-The minimum Apache Airflow version supported by this provider distribution is 
``2.11.0``.
+The minimum Apache Airflow version supported by this provider distribution is 
``3.0.0``.
 
 ==========================================  ==================
 PIP package                                 Version required
 ==========================================  ==================
-``apache-airflow``                          ``>=2.11.0``
+``apache-airflow``                          ``>=3.0.0``
 ``apache-airflow-providers-common-compat``  ``>=1.12.0``
 ``opensearch-py``                           ``>=2.2.0``
 ==========================================  ==================
diff --git a/providers/opensearch/docs/logging/index.rst 
b/providers/opensearch/docs/logging/index.rst
index 9e27d8bd66a..22c0314223b 100644
--- a/providers/opensearch/docs/logging/index.rst
+++ b/providers/opensearch/docs/logging/index.rst
@@ -25,6 +25,8 @@ Available only with Airflow>=3.0
 
 Airflow can be configured to read task logs from Opensearch and optionally 
write logs to stdout in standard or json format. These logs can later be 
collected and forwarded to the cluster using tools like fluentd, logstash or 
others.
 
+Airflow also supports writing logs to OpenSearch directly without requiring 
additional software like fluentd or logstash. To enable this feature, set 
``write_to_os`` and ``json_format`` to ``True`` and ``write_stdout`` to 
``False`` in ``airflow.cfg``.
+
 You can choose to have all task logs from workers output to the highest parent 
level process, instead of the standard file locations. This allows for some 
additional flexibility in container environments like Kubernetes, where 
container stdout is already being logged to the host nodes. From there a log 
shipping tool can be used to forward them along to Opensearch. To use this 
feature, set the ``write_stdout`` option in ``airflow.cfg``.
 You can also choose to have the logs output in a JSON format, using the 
``json_format`` option. Airflow uses the standard Python logging module and 
JSON fields are directly extracted from the LogRecord object. To use this 
feature, set the ``json_fields`` option in ``airflow.cfg``. Add the fields to 
the comma-delimited string that you want collected for the logs. These fields 
are from the LogRecord object in the ``logging`` module. `Documentation on 
different attributes can be found here  [...]
 
@@ -52,6 +54,24 @@ To output task logs to stdout in JSON format, the following 
config could be used
     write_stdout = True
     json_format = True
 
+To output task logs to OpenSearch directly, the following config could be 
used: (set ``delete_local_logs`` to ``True`` if you do not want to retain a 
local copy of the task log)
+
+.. code-block:: ini
+
+    [logging]
+    remote_logging = True
+    delete_local_logs = False
+
+    [opensearch]
+    host = <host>
+    port = <port>
+    username = <username>
+    password = <password>
+    write_stdout = False
+    json_format = True
+    write_to_os = True
+    target_index = [name of the index to store logs]
+
 .. _write-logs-elasticsearch-tls:
 
 Writing logs to Opensearch over TLS
@@ -60,6 +80,8 @@ Writing logs to Opensearch over TLS
 To add custom configurations to Opensearch (e.g. turning on ``ssl_verify``, 
adding a custom self-signed
 cert, etc.) use the ``opensearch_configs`` setting in your ``airflow.cfg``
 
+Note that these configurations also apply when you enable writing logs to 
OpenSearch directly.
+
 .. code-block:: ini
 
     [logging]
diff --git a/providers/opensearch/provider.yaml 
b/providers/opensearch/provider.yaml
index 26b9c57d8d2..84e71e41813 100644
--- a/providers/opensearch/provider.yaml
+++ b/providers/opensearch/provider.yaml
@@ -144,6 +144,20 @@ config:
         type: string
         example: ~
         default: "False"
+      write_to_os:
+        description: |
+          Write the task logs directly to OpenSearch
+        version_added: 1.9.0
+        type: string
+        example: ~
+        default: "False"
+      target_index:
+        description: |
+          Name of the index to write to when direct OpenSearch log writing is 
enabled
+        version_added: 1.9.0
+        type: string
+        example: ~
+        default: "airflow-logs"
       json_format:
         description: |
           Instead of the default log formatter, write the log lines as JSON
diff --git a/providers/opensearch/pyproject.toml 
b/providers/opensearch/pyproject.toml
index 22f10ad2036..b110dc36cfa 100644
--- a/providers/opensearch/pyproject.toml
+++ b/providers/opensearch/pyproject.toml
@@ -59,7 +59,7 @@ requires-python = ">=3.10"
 # Make sure to run ``prek update-providers-dependencies --all-files``
 # After you modify the dependencies, and rebuild your Breeze CI image with 
``breeze ci-image build``
 dependencies = [
-    "apache-airflow>=2.11.0",
+    "apache-airflow>=3.0.0",
     "apache-airflow-providers-common-compat>=1.12.0",
     "opensearch-py>=2.2.0",
 ]
diff --git a/providers/opensearch/src/airflow/providers/opensearch/__init__.py 
b/providers/opensearch/src/airflow/providers/opensearch/__init__.py
index 275df0392b4..d52ce569a16 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/__init__.py
@@ -32,8 +32,8 @@ __all__ = ["__version__"]
 __version__ = "1.8.5"
 
 if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
-    "2.11.0"
+    "3.0.0"
 ):
     raise RuntimeError(
-        f"The package `apache-airflow-providers-opensearch:{__version__}` 
needs Apache Airflow 2.11.0+"
+        f"The package `apache-airflow-providers-opensearch:{__version__}` 
needs Apache Airflow 3.0.0+"
     )
diff --git 
a/providers/opensearch/src/airflow/providers/opensearch/get_provider_info.py 
b/providers/opensearch/src/airflow/providers/opensearch/get_provider_info.py
index 99e439416d3..70a3fac8ec2 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/get_provider_info.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/get_provider_info.py
@@ -116,6 +116,20 @@ def get_provider_info():
                         "example": None,
                         "default": "False",
                     },
+                    "write_to_os": {
+                        "description": "Write the task logs directly to 
OpenSearch\n",
+                        "version_added": "1.9.0",
+                        "type": "string",
+                        "example": None,
+                        "default": "False",
+                    },
+                    "target_index": {
+                        "description": "Name of the index to write to when 
direct OpenSearch log writing is enabled\n",
+                        "version_added": "1.9.0",
+                        "type": "string",
+                        "example": None,
+                        "default": "airflow-logs",
+                    },
                     "json_format": {
                         "description": "Instead of the default log formatter, 
write the log lines as JSON\n",
                         "version_added": "1.5.0",
diff --git 
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py 
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
index 05f0ff90cbf..300739b52e5 100644
--- 
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
+++ 
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
@@ -18,27 +18,32 @@
 from __future__ import annotations
 
 import contextlib
+import json
 import logging
+import os
 import sys
 import time
 from collections import defaultdict
 from collections.abc import Callable
 from datetime import datetime
 from operator import attrgetter
+from pathlib import Path
 from typing import TYPE_CHECKING, Any, Literal, cast
 from urllib.parse import urlparse
 
+import attrs
 import pendulum
-from opensearchpy import OpenSearch
+from opensearchpy import OpenSearch, helpers
 from opensearchpy.exceptions import NotFoundError
 from sqlalchemy import select
 
+import airflow.logging_config as alc
 from airflow.models import DagRun
 from airflow.providers.common.compat.module_loading import import_string
 from airflow.providers.common.compat.sdk import AirflowException, conf
 from airflow.providers.opensearch.log.os_json_formatter import 
OpensearchJSONFormatter
 from airflow.providers.opensearch.log.os_response import Hit, 
OpensearchResponse
-from airflow.providers.opensearch.version_compat import AIRFLOW_V_3_0_PLUS
+from airflow.providers.opensearch.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_2_PLUS
 from airflow.utils import timezone
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
@@ -46,7 +51,8 @@ from airflow.utils.session import create_session
 
 if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
-    from airflow.utils.log.file_task_handler import LogMetadata
+    from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+    from airflow.utils.log.file_task_handler import LogMessages, LogMetadata, 
LogSourceInfo
 
 if AIRFLOW_V_3_0_PLUS:
     from airflow.utils.log.file_task_handler import StructuredLogMessage
@@ -154,16 +160,88 @@ def get_os_kwargs_from_config() -> dict[str, Any]:
     return kwargs_dict
 
 
+def _format_url(host: str) -> str:
+    """
+    Format the given host string to ensure it starts with 'http' and check if 
it represents a valid URL.
+
+    :params host: The host string to format and check.
+    """
+    parsed_url = urlparse(host)
+
+    if parsed_url.scheme not in ("http", "https"):
+        host = "http://"; + host
+        parsed_url = urlparse(host)
+
+    if not parsed_url.netloc:
+        raise ValueError(f"'{host}' is not a valid URL.")
+
+    return host
+
+
+def _create_opensearch_client(
+    host: str,
+    port: int | None,
+    username: str,
+    password: str,
+    os_kwargs: dict[str, Any],
+) -> OpenSearch:
+    parsed_url = urlparse(_format_url(host))
+    resolved_port = port if port is not None else (parsed_url.port or 9200)
+    return OpenSearch(
+        hosts=[{"host": parsed_url.hostname, "port": resolved_port, "scheme": 
parsed_url.scheme}],
+        http_auth=(username, password),
+        **os_kwargs,
+    )
+
+
+def _render_log_id(
+    log_id_template: str, ti: TaskInstance | TaskInstanceKey | RuntimeTI, 
try_number: int
+) -> str:
+    return log_id_template.format(
+        dag_id=ti.dag_id,
+        task_id=ti.task_id,
+        run_id=getattr(ti, "run_id", ""),
+        try_number=try_number,
+        map_index=getattr(ti, "map_index", ""),
+    )
+
+
+def _resolve_nested(hit: dict[Any, Any], parent_class=None) -> type[Hit]:
+    """
+    Resolve nested hits from OpenSearch by iteratively navigating the 
`_nested` field.
+
+    The result is used to fetch the appropriate document class to handle the 
hit.
+    """
+    doc_class = Hit
+    nested_field = None
+
+    nested_path: list[str] = []
+    nesting = hit["_nested"]
+    while nesting and "field" in nesting:
+        nested_path.append(nesting["field"])
+        nesting = nesting.get("_nested")
+    nested_path_str = ".".join(nested_path)
+
+    if hasattr(parent_class, "_index"):
+        nested_field = parent_class._index.resolve_field(nested_path_str)
+
+    if nested_field is not None:
+        return nested_field._doc_class
+
+    return doc_class
+
+
 class OpensearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, 
LoggingMixin):
     """
-    OpensearchTaskHandler is a Python log handler that reads and writes logs 
to OpenSearch.
+    OpensearchTaskHandler is a Python log handler that reads logs from 
OpenSearch.
 
-    Like the ElasticsearchTaskHandler, Airflow itself does not handle the 
indexing of logs.
-    Instead, logs are flushed to local files, and additional software (e.g., 
Filebeat, Logstash)
-    may be required to ship logs to OpenSearch. This handler then enables 
fetching and displaying
-    logs from OpenSearch.
+    Airflow flushes task logs to local files. Additional software setup can 
then ship those
+    logs to OpenSearch. On Airflow 3, this task handler also registers a 
matching
+    ``OpensearchRemoteLogIO`` so the new remote logging path can read from 
OpenSearch too.
+    Airflow can also be configured to write task logs to OpenSearch directly. 
To enable this
+    feature, set ``json_format`` and ``write_to_opensearch`` to ``True``.
 
-    To efficiently query and sort Elasticsearch results, this handler assumes 
each
+    To efficiently query and sort OpenSearch results, this handler assumes each
     log message has a field `log_id` consists of ti primary keys:
     `log_id = {dag_id}-{task_id}-{logical_date}-{try_number}`
     Log messages with specific log_id are sorted based on `offset`,
@@ -180,6 +258,8 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
     :param port: OpenSearch port.
     :param username: Username for OpenSearch authentication.
     :param password: Password for OpenSearch authentication.
+    :param write_to_opensearch: Whether to write logs directly to OpenSearch.
+    :param target_index: Name of the index to write to when direct OpenSearch 
writes are enabled.
     :param host_field: The field name for the host in the logs (default is 
"host").
     :param offset_field: The field name for the log offset (default is 
"offset").
     :param index_patterns: Index pattern or template for storing logs.
@@ -202,17 +282,22 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
         json_format: bool,
         json_fields: str,
         host: str,
-        port: int,
+        port: int | None,
         username: str,
         password: str,
+        write_to_opensearch: bool = False,
+        target_index: str = "airflow-logs",
         host_field: str = "host",
         offset_field: str = "offset",
         index_patterns: str = conf.get("opensearch", "index_patterns", 
fallback="_all"),
         index_patterns_callable: str = conf.get("opensearch", 
"index_patterns_callable", fallback=""),
+        log_id_template: str = conf.get("opensearch", "log_id_template", 
fallback="")
+        or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
         os_kwargs: dict | None | Literal["default_os_kwargs"] = 
"default_os_kwargs",
         max_bytes: int = 0,
         backup_count: int = 0,
         delay: bool = False,
+        **kwargs,
     ) -> None:
         os_kwargs = os_kwargs or {}
         if os_kwargs == "default_os_kwargs":
@@ -225,23 +310,54 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
         self.mark_end_on_close = True
         self.end_of_log_mark = end_of_log_mark.strip()
         self.write_stdout = write_stdout
+        self.write_to_opensearch = write_to_opensearch
         self.json_format = json_format
         self.json_fields = [label.strip() for label in json_fields.split(",")]
         self.host = self.format_url(host)
         self.host_field = host_field
         self.offset_field = offset_field
+        self.target_index = target_index
         self.index_patterns = index_patterns
         self.index_patterns_callable = index_patterns_callable
         self.context_set = False
-        self.client = OpenSearch(
-            hosts=[{"host": host, "port": port}],
-            http_auth=(username, password),
-            **os_kwargs,
+        self.client = _create_opensearch_client(
+            self.host,
+            port,
+            username,
+            password,
+            cast("dict[str, Any]", os_kwargs),
         )
+        self.delete_local_copy = kwargs.get(
+            "delete_local_copy", conf.getboolean("logging", 
"delete_local_logs")
+        )
+        self.log_id_template = log_id_template
         self.formatter: logging.Formatter
-        self.handler: logging.FileHandler | logging.StreamHandler
+        self.handler: logging.FileHandler | logging.StreamHandler | None = None
         self._doc_type_map: dict[Any, Any] = {}
         self._doc_type: list[Any] = []
+        self.io = OpensearchRemoteLogIO(
+            host=self.host,
+            port=port,
+            username=username,
+            password=password,
+            write_to_opensearch=self.write_to_opensearch,
+            target_index=self.target_index,
+            write_stdout=self.write_stdout,
+            offset_field=self.offset_field,
+            host_field=self.host_field,
+            base_log_folder=base_log_folder,
+            delete_local_copy=self.delete_local_copy,
+            json_format=self.json_format,
+            log_id_template=self.log_id_template,
+        )
+        if AIRFLOW_V_3_0_PLUS:
+            if AIRFLOW_V_3_2_PLUS:
+                from airflow.logging_config import _ActiveLoggingConfig, 
get_remote_task_log
+
+                if get_remote_task_log() is None:
+                    _ActiveLoggingConfig.set(self.io, None)
+            elif alc.REMOTE_TASK_LOG is None:  # type: ignore[attr-defined]
+                alc.REMOTE_TASK_LOG = self.io  # type: ignore[attr-defined]
 
     def set_context(self, ti: TaskInstance, *, identifier: str | None = None) 
-> None:
         """
@@ -342,6 +458,7 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
     def _render_log_id(self, ti: TaskInstance | TaskInstanceKey, try_number: 
int) -> str:
         from airflow.models.taskinstance import TaskInstanceKey
 
+        log_id_template = self.log_id_template
         with create_session() as session:
             if isinstance(ti, TaskInstanceKey):
                 ti = _ensure_ti(ti, session)
@@ -393,9 +510,9 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
 
         offset = metadata["offset"]
         log_id = self._render_log_id(ti, try_number)
-        response = self._os_read(log_id, offset, ti)
+        response = self.io._os_read(log_id, offset, ti)
         if response is not None and response.hits:
-            logs_by_host = self._group_logs_by_host(response)
+            logs_by_host = self.io._group_logs_by_host(response)
             next_offset = attrgetter(self.offset_field)(response[-1])
         else:
             logs_by_host = None
@@ -410,7 +527,7 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
         # have the log uploaded but will not be stored in elasticsearch.
         metadata["end_of_log"] = False
         if logs_by_host:
-            if any(x[-1].message == self.end_of_log_mark for x in 
logs_by_host.values()):
+            if any(self._get_log_message(x[-1]) == self.end_of_log_mark for x 
in logs_by_host.values()):
                 metadata["end_of_log"] = True
 
         cur_ts = pendulum.now()
@@ -446,10 +563,6 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
 
         # If we hit the end of the log, remove the actual end_of_log message
         # to prevent it from showing in the UI.
-        def concat_logs(hits: list[Hit]):
-            log_range = (len(hits) - 1) if hits[-1].message == 
self.end_of_log_mark else len(hits)
-            return "\n".join(self._format_msg(hits[i]) for i in 
range(log_range))
-
         if logs_by_host:
             if AIRFLOW_V_3_0_PLUS:
                 from airflow.utils.log.file_task_handler import 
StructuredLogMessage
@@ -469,9 +582,10 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
                     for hit in hits
                 ]
             else:
-                message = [(host, concat_logs(hits)) for host, hits in 
logs_by_host.items()]  # type: ignore[misc]
+                message = [(host, self.concat_logs(hits)) for host, hits in 
logs_by_host.items()]  # type: ignore[misc]
         else:
             message = []
+            metadata["end_of_log"] = True
         return message, metadata
 
     def _os_read(self, log_id: str, offset: int | str, ti: TaskInstance) -> 
OpensearchResponse | None:
@@ -576,7 +690,7 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
         dt = hit.get("_type")
 
         if "_nested" in hit:
-            doc_class = self._resolve_nested(hit, parent_class)
+            doc_class = _resolve_nested(hit, parent_class)
 
         elif dt in self._doc_type_map:
             doc_class = self._doc_type_map[dt]
@@ -594,32 +708,6 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
         callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
         return callback(hit)
 
-    def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) -> 
type[Hit]:
-        """
-        Resolve nested hits from Elasticsearch by iteratively navigating the 
`_nested` field.
-
-        The result is used to fetch the appropriate document class to handle 
the hit.
-
-        This method can be used with nested Elasticsearch fields which are 
structured
-        as dictionaries with "field" and "_nested" keys.
-        """
-        doc_class = Hit
-
-        nested_path: list[str] = []
-        nesting = hit["_nested"]
-        while nesting and "field" in nesting:
-            nested_path.append(nesting["field"])
-            nesting = nesting.get("_nested")
-        nested_path_str = ".".join(nested_path)
-
-        if hasattr(parent_class, "_index"):
-            nested_field = parent_class._index.resolve_field(nested_path_str)
-
-        if nested_field is not None:
-            return nested_field._doc_class
-
-        return doc_class
-
     def _group_logs_by_host(self, response: OpensearchResponse) -> dict[str, 
list[Hit]]:
         grouped_logs = defaultdict(list)
         for hit in response:
@@ -638,7 +726,18 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
                 )
 
         # Just a safe-guard to preserve backwards-compatibility
-        return hit.message
+        return self._get_log_message(hit)
+
+    def _get_log_message(self, hit: Hit) -> str:
+        if hasattr(hit, "event"):
+            return hit.event
+        if hasattr(hit, "message"):
+            return hit.message
+        return ""
+
+    def concat_logs(self, hits: list[Hit]) -> str:
+        log_range = (len(hits) - 1) if self._get_log_message(hits[-1]) == 
self.end_of_log_mark else len(hits)
+        return "\n".join(self._format_msg(hits[i]) for i in range(log_range))
 
     @property
     def supports_external_link(self) -> bool:
@@ -664,18 +763,198 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
 
     @staticmethod
     def format_url(host: str) -> str:
+        return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    json_format: bool = False
+    write_stdout: bool = False
+    write_to_opensearch: bool = False
+    delete_local_copy: bool = False
+    host: str = "localhost"
+    port: int | None = 9200
+    username: str = ""
+    password: str = ""
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    base_log_folder: Path = attrs.field(converter=Path)
+    log_id_template: str = (
+        conf.get("opensearch", "log_id_template", fallback="")
+        or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+    )
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        self.host = _format_url(self.host)
+        self.port = self.port if self.port is not None else 
(urlparse(self.host).port or 9200)
+        self.client = _create_opensearch_client(
+            self.host,
+            self.port,
+            self.username,
+            self.password,
+            get_os_kwargs_from_config(),
+        )
+        self.index_patterns_callable = conf.get("opensearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns = conf.get("opensearch", "index_patterns", 
fallback="_all")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Emit structured task logs to stdout and/or write them directly to 
OpenSearch."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+        if not local_loc.is_file():
+            return
+
+        log_id = _render_log_id(self.log_id_template, ti, ti.try_number)  # 
type: ignore[arg-type]
+        if self.write_stdout or self.write_to_opensearch:
+            log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+
+            if self.write_stdout:
+                for line in log_lines:
+                    sys.stdout.write(json.dumps(line) + "\n")
+                    sys.stdout.flush()
+
+            if self.write_to_opensearch:
+                success = self._write_to_opensearch(log_lines)
+                if success and self.delete_local_copy:
+                    local_loc.unlink(missing_ok=True)
+                    base_dir = self.base_log_folder
+                    parent = local_loc.parent
+                    while parent != base_dir and parent.is_dir():
+                        if any(parent.iterdir()):
+                            break
+                        with contextlib.suppress(OSError):
+                            parent.rmdir()
+                        parent = parent.parent
+
+    def _parse_raw_log(self, log: str, log_id: str) -> list[dict[str, Any]]:
+        parsed_logs = []
+        offset = 1
+        for line in log.split("\n"):
+            if not line.strip():
+                continue
+            try:
+                log_dict = json.loads(line)
+            except json.JSONDecodeError:
+                self.log.warning("Skipping non-JSON log line: %r", line)
+                log_dict = {"event": line}
+            log_dict.update({"log_id": log_id, self.offset_field: offset})
+            offset += 1
+            parsed_logs.append(log_dict)
+        return parsed_logs
+
+    def _write_to_opensearch(self, log_lines: list[dict[str, Any]]) -> bool:
         """
-        Format the given host string to ensure it starts with 'http' and check 
if it represents a valid URL.
+        Write logs to OpenSearch; return `True` on success and `False` on 
failure.
 
-        :params host: The host string to format and check.
+        :param log_lines: The parsed log lines to write to OpenSearch.
         """
-        parsed_url = urlparse(host)
+        bulk_actions = [{"_index": self.target_index, "_source": log} for log 
in log_lines]
+        try:
+            _ = helpers.bulk(self.client, bulk_actions)
+            return True
+        except helpers.BulkIndexError as bie:
+            self.log.exception("Bulk upload failed for %d log(s)", 
len(bie.errors))
+            for error in bie.errors:
+                self.log.exception(error)
+            return False
+        except Exception as e:
+            self.log.exception("Unable to insert logs into OpenSearch. Reason: 
%s", str(e))
+            return False
+
+    def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, 
LogMessages]:
+        log_id = _render_log_id(self.log_id_template, ti, ti.try_number)  # 
type: ignore[arg-type]
+        self.log.info("Reading log %s from Opensearch", log_id)
+        response = self._os_read(log_id, 0, ti)
+        if response is not None and response.hits:
+            logs_by_host = self._group_logs_by_host(response)
+        else:
+            logs_by_host = None
 
-        if parsed_url.scheme not in ("http", "https"):
-            host = "http://"; + host
-            parsed_url = urlparse(host)
+        if logs_by_host is None:
+            missing_log_message = (
+                f"*** Log {log_id} not found in Opensearch. "
+                "If your task started recently, please wait a moment and 
reload this page. "
+                "Otherwise, the logs for this task instance may have been 
removed."
+            )
+            return [], [missing_log_message]
 
-        if not parsed_url.netloc:
-            raise ValueError(f"'{host}' is not a valid URL.")
+        header = list(logs_by_host.keys())
+        message = []
+        for hits in logs_by_host.values():
+            for hit in hits:
+                message.append(json.dumps(_build_log_fields(hit.to_dict())))
+        return header, message
 
-        return host
+    def _os_read(self, log_id: str, offset: int | str, ti: RuntimeTI) -> 
OpensearchResponse | None:
+        """Return the logs matching ``log_id`` in OpenSearch."""
+        query: dict[Any, Any] = {
+            "query": {
+                "bool": {
+                    "filter": [{"range": {self.offset_field: {"gt": 
int(offset)}}}],
+                    "must": [{"match_phrase": {"log_id": log_id}}],
+                }
+            }
+        }
+        index_patterns = self._get_index_patterns(ti)
+        try:
+            max_log_line = self.client.count(index=index_patterns, 
body=query)["count"]
+        except NotFoundError as e:
+            self.log.exception("The target index pattern %s does not exist", 
index_patterns)
+            raise e
+
+        if max_log_line != 0:
+            try:
+                res = self.client.search(
+                    index=index_patterns,
+                    body=query,
+                    sort=[self.offset_field],
+                    size=self.MAX_LINE_PER_PAGE,
+                    from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+                )
+                return OpensearchResponse(self, res)
+            except Exception as err:
+                self.log.exception("Could not read log with log_id: %s. 
Exception: %s", log_id, err)
+
+        return None
+
+    def _get_index_patterns(self, ti: RuntimeTI | None) -> str:
+        if self.index_patterns_callable:
+            self.log.debug("Using index_patterns_callable: %s", 
self.index_patterns_callable)
+            index_pattern_callable_obj = 
import_string(self.index_patterns_callable)
+            return index_pattern_callable_obj(ti)
+        self.log.debug("Using index_patterns: %s", self.index_patterns)
+        return self.index_patterns
+
+    def _group_logs_by_host(self, response: OpensearchResponse) -> dict[str, 
list[Hit]]:
+        grouped_logs = defaultdict(list)
+        for hit in response:
+            key = getattr_nested(hit, self.host_field, None) or self.host
+            grouped_logs[key].append(hit)
+        return grouped_logs
+
+    def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:
+        doc_class = Hit
+        dt = hit.get("_type")
+
+        if "_nested" in hit:
+            doc_class = _resolve_nested(hit, parent_class)
+        elif dt in self._doc_type_map:
+            doc_class = self._doc_type_map[dt]
+        else:
+            for doc_type in self._doc_type:
+                if hasattr(doc_type, "_matches") and doc_type._matches(hit):
+                    doc_class = doc_type
+                    break
+
+        for t in hit.get("inner_hits", ()):
+            hit["inner_hits"][t] = OpensearchResponse(self, 
hit["inner_hits"][t], doc_class=doc_class)
+
+        callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
+        return callback(hit)
diff --git 
a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py 
b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
index 613a946fc90..e840569a4d8 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
@@ -34,8 +34,10 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
 AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
+AIRFLOW_V_3_2_PLUS: bool = get_base_airflow_version_tuple() >= (3, 2, 0)
 
 __all__ = [
     "AIRFLOW_V_3_0_PLUS",
     "AIRFLOW_V_3_1_PLUS",
+    "AIRFLOW_V_3_2_PLUS",
 ]
diff --git a/providers/opensearch/src/airflow/providers/opensearch/__init__.py 
b/providers/opensearch/tests/integration/__init__.py
similarity index 52%
copy from providers/opensearch/src/airflow/providers/opensearch/__init__.py
copy to providers/opensearch/tests/integration/__init__.py
index 275df0392b4..5966d6b1d52 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
+++ b/providers/opensearch/tests/integration/__init__.py
@@ -14,26 +14,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the 
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.5"
-
-if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
-    "2.11.0"
-):
-    raise RuntimeError(
-        f"The package `apache-airflow-providers-opensearch:{__version__}` 
needs Apache Airflow 2.11.0+"
-    )
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
diff --git a/providers/opensearch/src/airflow/providers/opensearch/__init__.py 
b/providers/opensearch/tests/integration/opensearch/__init__.py
similarity index 52%
copy from providers/opensearch/src/airflow/providers/opensearch/__init__.py
copy to providers/opensearch/tests/integration/opensearch/__init__.py
index 275df0392b4..13a83393a91 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
+++ b/providers/opensearch/tests/integration/opensearch/__init__.py
@@ -14,26 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the 
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.5"
-
-if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
-    "2.11.0"
-):
-    raise RuntimeError(
-        f"The package `apache-airflow-providers-opensearch:{__version__}` 
needs Apache Airflow 2.11.0+"
-    )
diff --git a/providers/opensearch/src/airflow/providers/opensearch/__init__.py 
b/providers/opensearch/tests/integration/opensearch/log/__init__.py
similarity index 52%
copy from providers/opensearch/src/airflow/providers/opensearch/__init__.py
copy to providers/opensearch/tests/integration/opensearch/log/__init__.py
index 275df0392b4..13a83393a91 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
+++ b/providers/opensearch/tests/integration/opensearch/log/__init__.py
@@ -14,26 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the 
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.5"
-
-if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
-    "2.11.0"
-):
-    raise RuntimeError(
-        f"The package `apache-airflow-providers-opensearch:{__version__}` 
needs Apache Airflow 2.11.0+"
-    )
diff --git 
a/providers/opensearch/tests/integration/opensearch/log/test_os_remote_log_io.py
 
b/providers/opensearch/tests/integration/opensearch/log/test_os_remote_log_io.py
new file mode 100644
index 00000000000..9aef4777618
--- /dev/null
+++ 
b/providers/opensearch/tests/integration/opensearch/log/test_os_remote_log_io.py
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import dataclasses
+import json
+import uuid
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.opensearch.log.os_task_handler import 
OpensearchRemoteLogIO, _render_log_id
+
+opensearchpy = pytest.importorskip("opensearchpy")
+
+# The OpenSearch service hostname as defined in 
scripts/ci/docker-compose/integration-opensearch.yml
+OS_HOST = "http://opensearch";
+
+
[email protected]
+class _MockTI:
+    """Minimal TaskInstance-like object satisfying the RuntimeTI protocol for 
log ID rendering."""
+
+    dag_id: str = "integration_test_dag"
+    task_id: str = "integration_test_task"
+    run_id: str = "integration_test_run"
+    try_number: int = 1
+    map_index: int = -1
+
+
[email protected]("opensearch")
+class TestOpensearchRemoteLogIOIntegration:
+    """
+    Integration tests for OpensearchRemoteLogIO using the breeze opensearch 
service.
+
+    These tests require the opensearch integration to be running:
+        breeze testing providers-integration-tests --integration opensearch
+    """
+
+    @pytest.fixture(autouse=True)
+    def setup(self, tmp_path):
+        self.target_index = f"airflow-logs-{uuid.uuid4().hex}"
+        self.opensearch_io = OpensearchRemoteLogIO(
+            write_to_opensearch=True,
+            write_stdout=False,
+            delete_local_copy=False,
+            host=OS_HOST,
+            port=9200,
+            username="",
+            password="",
+            base_log_folder=tmp_path,
+            target_index=self.target_index,
+        )
+        self.opensearch_io.index_patterns = self.target_index
+        self.opensearch_io.client = opensearchpy.OpenSearch(
+            hosts=[{"host": "opensearch", "port": 9200, "scheme": "http"}]
+        )
+
+    @pytest.fixture
+    def ti(self):
+        return _MockTI()
+
+    @pytest.fixture
+    def tmp_json_log_file(self, tmp_path):
+        log_file = tmp_path / "1.log"
+        sample_logs = [
+            {"message": "start"},
+            {"message": "processing"},
+            {"message": "end"},
+        ]
+        log_file.write_text("\n".join(json.dumps(log) for log in sample_logs) 
+ "\n")
+        return log_file
+
+    @patch(
+        "airflow.providers.opensearch.log.os_task_handler.TASK_LOG_FIELDS",
+        ["message"],
+    )
+    def test_upload_and_read(self, tmp_json_log_file, ti):
+        self.opensearch_io.upload(tmp_json_log_file, ti)
+        self.opensearch_io.client.indices.refresh(index=self.target_index)
+
+        log_source_info, log_messages = self.opensearch_io.read("", ti)
+
+        assert log_source_info[0] == OS_HOST
+        assert len(log_messages) == 3
+
+        expected_messages = ["start", "processing", "end"]
+        for expected, log_message in zip(expected_messages, log_messages):
+            log_entry = json.loads(log_message)
+            assert "event" in log_entry
+            assert log_entry["event"] == expected
+
+    def test_read_missing_log(self, ti):
+        self.opensearch_io.client.indices.create(index=self.target_index)
+
+        log_source_info, log_messages = self.opensearch_io.read("", ti)
+
+        assert log_source_info == []
+        assert len(log_messages) == 1
+        assert "not found in Opensearch" in log_messages[0]
+
+    def test_read_error_detail_integration(self, ti):
+        error_detail = [
+            {
+                "is_cause": False,
+                "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno": 
13, "name": "log_and_raise"}],
+                "exc_type": "RuntimeError",
+                "exc_value": "Woopsie. Something went wrong.",
+            }
+        ]
+        body = {
+            "event": "Task failed with exception",
+            "log_id": _render_log_id(self.opensearch_io.log_id_template, ti, 
ti.try_number),
+            "offset": 1,
+            "error_detail": error_detail,
+        }
+        self.opensearch_io.client.index(index=self.target_index, body=body)
+        self.opensearch_io.client.indices.refresh(index=self.target_index)
+
+        log_source_info, log_messages = self.opensearch_io.read("", ti)
+
+        assert log_source_info[0] == OS_HOST
+        assert len(log_messages) == 1
+        log_entry = json.loads(log_messages[0])
+        assert "error_detail" in log_entry
+        assert log_entry["error_detail"] == error_detail
diff --git a/providers/opensearch/tests/unit/opensearch/conftest.py 
b/providers/opensearch/tests/unit/opensearch/conftest.py
index d58a4703e76..97256d4ac5d 100644
--- a/providers/opensearch/tests/unit/opensearch/conftest.py
+++ b/providers/opensearch/tests/unit/opensearch/conftest.py
@@ -49,55 +49,6 @@ class MockSearch(OpenSearchHook):
         return doc_id
 
 
-class MockClient:
-    def count(self, index: Any = None, body: Any = None):
-        return {"count": 1, "_shards": {"total": 1, "successful": 1, 
"skipped": 0, "failed": 0}}
-
-    def search(self, index=None, body=None, sort=None, size=None, from_=None):
-        return self.sample_log_response()
-
-    def sample_log_response(self):
-        return {
-            "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 
7},
-            "hits": {
-                "hits": [
-                    {
-                        "_id": "jdeZT4kBjAZqZnexVUxk",
-                        "_source": {
-                            "dag_id": "example_bash_operator",
-                            "execution_date": "2023_07_09T07_47_32_000000",
-                            "levelname": "INFO",
-                            "message": "Some Message 1",
-                            "event": "Some Message 1",
-                            "task_id": "run_after_loop",
-                            "try_number": "1",
-                            "offset": 0,
-                        },
-                        "_type": "_doc",
-                    },
-                    {
-                        "_id": "qteZT4kBjAZqZnexVUxl",
-                        "_source": {
-                            "dag_id": "example_bash_operator",
-                            "execution_date": "2023_07_09T07_47_32_000000",
-                            "levelname": "INFO",
-                            "message": "Another Some Message 2",
-                            "event": "Another Some Message 2",
-                            "task_id": "run_after_loop",
-                            "try_number": "1",
-                            "offset": 1,
-                        },
-                        "_type": "_doc",
-                    },
-                ],
-                "max_score": 2.482621,
-                "total": {"relation": "eq", "value": 36},
-            },
-            "timed_out": False,
-            "took": 7,
-        }
-
-
 @pytest.fixture
 def mock_hook(monkeypatch):
     monkeypatch.setattr(OpenSearchHook, "search", MockSearch.search)
diff --git 
a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py 
b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
index 15aba25ae8b..e66925c7e10 100644
--- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
+++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
@@ -17,13 +17,12 @@
 # under the License.
 from __future__ import annotations
 
+import dataclasses
 import json
 import logging
-import os
 import re
-import shutil
 from io import StringIO
-from unittest import mock
+from pathlib import Path
 from unittest.mock import Mock, patch
 
 import pendulum
@@ -31,9 +30,14 @@ import pytest
 from opensearchpy.exceptions import NotFoundError
 
 from airflow.providers.common.compat.sdk import conf
+from airflow.providers.opensearch.log.os_json_formatter import 
OpensearchJSONFormatter
 from airflow.providers.opensearch.log.os_response import OpensearchResponse
 from airflow.providers.opensearch.log.os_task_handler import (
+    OpensearchRemoteLogIO,
     OpensearchTaskHandler,
+    _build_log_fields,
+    _format_error_detail,
+    _render_log_id,
     get_os_kwargs_from_config,
     getattr_nested,
 )
@@ -44,11 +48,19 @@ from airflow.utils.timezone import datetime
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import clear_db_dags, clear_db_runs
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-from unit.opensearch.conftest import MockClient
 
 opensearchpy = pytest.importorskip("opensearchpy")
 
 
[email protected]
+class _MockTI:
+    dag_id: str = "dag_for_testing_os_log_handler"
+    task_id: str = "task_for_testing_os_log_handler"
+    run_id: str = "run_for_testing_os_log_handler"
+    try_number: int = 1
+    map_index: int = -1
+
+
 def get_ti(dag_id, task_id, logical_date, create_task_instance):
     ti = create_task_instance(
         dag_id=dag_id,
@@ -62,6 +74,75 @@ def get_ti(dag_id, task_id, logical_date, 
create_task_instance):
     return ti
 
 
+def _build_os_search_response(*sources: dict, index: str = "test_index", 
doc_type: str = "_doc") -> dict:
+    hits = [
+        {
+            "_id": str(i),
+            "_index": index,
+            "_score": 1.0,
+            "_source": source,
+            "_type": doc_type,
+        }
+        for i, source in enumerate(sources, start=1)
+    ]
+    return {
+        "_shards": {"failed": 0, "skipped": 0, "successful": 1, "total": 1},
+        "hits": {
+            "hits": hits,
+            "max_score": 1.0,
+            "total": {"relation": "eq", "value": len(hits)},
+        },
+        "timed_out": False,
+        "took": 1,
+    }
+
+
+def _make_os_response(search, *sources: dict) -> OpensearchResponse:
+    return OpensearchResponse(search, _build_os_search_response(*sources))
+
+
+def _metadata_from_result(metadatas):
+    return metadatas if AIRFLOW_V_3_0_PLUS else metadatas[0]
+
+
+def _assert_log_events(logs, metadatas, *, expected_events: list[str], 
expected_sources: list[str]):
+    metadata = _metadata_from_result(metadatas)
+    if AIRFLOW_V_3_0_PLUS:
+        logs = list(logs)
+        assert logs[0].event == "::group::Log message source details"
+        assert logs[0].sources == expected_sources
+        assert logs[1].event == "::endgroup::"
+        assert [log.event for log in logs[2:]] == expected_events
+    else:
+        assert len(logs) == 1
+        assert len(logs[0]) == 1
+        assert logs[0][0][0] == expected_sources[0]
+        assert logs[0][0][1] == "\n".join(expected_events)
+    return metadata
+
+
+def _assert_no_logs(logs, metadatas):
+    metadata = _metadata_from_result(metadatas)
+    if AIRFLOW_V_3_0_PLUS:
+        assert logs == []
+    else:
+        assert logs == [[]]
+    return metadata
+
+
+def _assert_missing_log_message(logs):
+    expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*"
+    if AIRFLOW_V_3_0_PLUS:
+        logs = list(logs)
+        assert len(logs) == 1
+        assert logs[0].event is not None
+        assert re.match(expected_pattern, logs[0].event) is not None
+    else:
+        assert len(logs) == 1
+        assert len(logs[0]) == 1
+        assert re.match(expected_pattern, logs[0][0][1]) is not None
+
+
 class TestOpensearchTaskHandler:
     DAG_ID = "dag_for_testing_os_task_handler"
     TASK_ID = "task_for_testing_os_log_handler"
@@ -70,80 +151,83 @@ class TestOpensearchTaskHandler:
     JSON_LOG_ID = 
f"{DAG_ID}-{TASK_ID}-{OpensearchTaskHandler._clean_date(LOGICAL_DATE)}-1"
     FILENAME_TEMPLATE = "{try_number}.log"
 
-    # TODO: Remove when we stop testing for 2.11 compatibility
     @pytest.fixture(autouse=True)
     def _use_historical_filename_templates(self):
         with conf_vars({("core", "use_historical_filename_templates"): 
"True"}):
             yield
 
-    @pytest.fixture
-    def ti(self, create_task_instance, create_log_template):
-        if AIRFLOW_V_3_0_PLUS:
-            create_log_template(self.FILENAME_TEMPLATE, 
"{dag_id}-{task_id}-{logical_date}-{try_number}")
-        else:
-            create_log_template(
-                self.FILENAME_TEMPLATE,
-                "{dag_id}-{task_id}-{execution_date}-{try_number}",
-            )
-        yield get_ti(
-            dag_id=self.DAG_ID,
-            task_id=self.TASK_ID,
-            logical_date=self.LOGICAL_DATE,
-            create_task_instance=create_task_instance,
-        )
-        clear_db_runs()
-        clear_db_dags()
-
-    def setup_method(self):
-        self.local_log_location = "local/log/location"
+    @pytest.fixture(autouse=True)
+    def _setup_handler(self, tmp_path):
+        self.local_log_location = str(tmp_path / "logs")
         self.end_of_log_mark = "end_of_log\n"
         self.write_stdout = False
         self.json_format = False
         self.json_fields = "asctime,filename,lineno,levelname,message,exc_text"
         self.host_field = "host"
         self.offset_field = "offset"
-        self.username = "admin"
-        self.password = "admin"
-        self.host = "localhost"
-        self.port = 9200
+        self.test_message = "Some Message 1"
+        self.base_log_source = {
+            "message": self.test_message,
+            "event": self.test_message,
+            "log_id": self.LOG_ID,
+            "offset": 1,
+        }
         self.os_task_handler = OpensearchTaskHandler(
             base_log_folder=self.local_log_location,
             end_of_log_mark=self.end_of_log_mark,
             write_stdout=self.write_stdout,
-            host=self.host,
-            port=self.port,
-            username=self.username,
-            password=self.password,
+            host="localhost",
+            port=9200,
+            username="admin",
+            password="admin",
             json_format=self.json_format,
             json_fields=self.json_fields,
             host_field=self.host_field,
             offset_field=self.offset_field,
         )
 
-        self.os_task_handler.client = MockClient()
-
-    def teardown_method(self):
-        shutil.rmtree(self.local_log_location.split(os.path.sep)[0], 
ignore_errors=True)
-
-    def test_os_response(self):
-        sample_response = self.os_task_handler.client.sample_log_response()
-        response = OpensearchResponse(self.os_task_handler, sample_response)
-        logs_by_host = self.os_task_handler._group_logs_by_host(response)
-
-        def concat_logs(lines):
-            log_range = -1 if lines[-1].message == 
self.os_task_handler.end_of_log_mark else None
-            return "\n".join(self.os_task_handler._format_msg(line) for line 
in lines[:log_range])
-
-        for hosted_log in logs_by_host.values():
-            message = concat_logs(hosted_log)
+    @pytest.fixture
+    def ti(self, create_task_instance, create_log_template):
+        create_log_template(
+            self.FILENAME_TEMPLATE,
+            (
+                "{dag_id}-{task_id}-{logical_date}-{try_number}"
+                if AIRFLOW_V_3_0_PLUS
+                else "{dag_id}-{task_id}-{execution_date}-{try_number}"
+            ),
+        )
+        yield get_ti(
+            dag_id=self.DAG_ID,
+            task_id=self.TASK_ID,
+            logical_date=self.LOGICAL_DATE,
+            create_task_instance=create_task_instance,
+        )
+        clear_db_runs()
+        clear_db_dags()
 
-        assert message == "Some Message 1\nAnother Some Message 2"
+    @pytest.mark.parametrize(
+        ("host", "expected"),
+        [
+            ("http://localhost";, "http://localhost";),
+            ("https://localhost";, "https://localhost";),
+            ("localhost", "http://localhost";),
+            ("someurl", "http://someurl";),
+            ("https://";, "ValueError"),
+        ],
+    )
+    def test_format_url(self, host, expected):
+        if expected == "ValueError":
+            with pytest.raises(ValueError, match="'https://' is not a valid 
URL."):
+                OpensearchTaskHandler.format_url(host)
+        else:
+            assert OpensearchTaskHandler.format_url(host) == expected
 
     def test_client(self):
+        assert isinstance(self.os_task_handler.client, opensearchpy.OpenSearch)
         assert self.os_task_handler.index_patterns == "_all"
 
     def test_client_with_config(self):
-        config = dict(conf.getsection("opensearch_configs"))
+        os_conf = dict(conf.getsection("opensearch_configs"))
         expected_dict = {
             "http_compress": False,
             "use_ssl": False,
@@ -152,34 +236,32 @@ class TestOpensearchTaskHandler:
             "ssl_show_warn": False,
             "ca_certs": "",
         }
-        assert config == expected_dict
-        # ensure creating with configs does not fail
+        assert os_conf == expected_dict
         OpensearchTaskHandler(
             base_log_folder=self.local_log_location,
             end_of_log_mark=self.end_of_log_mark,
             write_stdout=self.write_stdout,
-            host=self.host,
-            port=self.port,
-            username=self.username,
-            password=self.password,
+            host="localhost",
+            port=9200,
+            username="admin",
+            password="admin",
             json_format=self.json_format,
             json_fields=self.json_fields,
             host_field=self.host_field,
             offset_field=self.offset_field,
-            os_kwargs=config,
+            os_kwargs=os_conf,
         )
 
     def test_client_with_patterns(self):
-        # ensure creating with index patterns does not fail
         patterns = "test_*,other_*"
         handler = OpensearchTaskHandler(
             base_log_folder=self.local_log_location,
             end_of_log_mark=self.end_of_log_mark,
             write_stdout=self.write_stdout,
-            host=self.host,
-            port=self.port,
-            username=self.username,
-            password=self.password,
+            host="localhost",
+            port=9200,
+            username="admin",
+            password="admin",
             json_format=self.json_format,
             json_fields=self.json_fields,
             host_field=self.host_field,
@@ -189,197 +271,113 @@ class TestOpensearchTaskHandler:
         assert handler.index_patterns == patterns
 
     @pytest.mark.db_test
-    def test_read(self, ti):
-        ts = pendulum.now()
-        logs, metadatas = self.os_task_handler.read(
-            ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": 
False}
-        )
+    @pytest.mark.parametrize("metadata_mode", ["provided", "none", "empty"])
+    def test_read(self, ti, metadata_mode):
+        start_time = pendulum.now()
+        response = _make_os_response(self.os_task_handler.io, 
self.base_log_source)
 
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            expected_msg = "Some Message 1"
-            assert logs[0].event == "::group::Log message source details"
-            assert logs[0].sources == ["http://localhost";]
-            assert logs[1].event == "::endgroup::"
-            assert logs[2].event == expected_msg
-            metadata = metadatas
-        else:
-            expected_msg = "Some Message 1\nAnother Some Message 2"
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert len(logs[0]) == 1
-            assert logs[0][0][-1] == expected_msg
+        with patch.object(self.os_task_handler.io, "_os_read", 
return_value=response):
+            if metadata_mode == "provided":
+                logs, metadatas = self.os_task_handler.read(
+                    ti,
+                    1,
+                    {"offset": 0, "last_log_timestamp": str(start_time), 
"end_of_log": False},
+                )
+            elif metadata_mode == "empty":
+                logs, metadatas = self.os_task_handler.read(ti, 1, {})
+            else:
+                logs, metadatas = self.os_task_handler.read(ti, 1)
 
-            metadata = metadatas[0]
+        metadata = _assert_log_events(
+            logs,
+            metadatas,
+            expected_events=[self.test_message],
+            expected_sources=["http://localhost";],
+        )
 
         assert not metadata["end_of_log"]
-        assert timezone.parse(metadata["last_log_timestamp"]) > ts
+        assert metadata["offset"] == "1"
+        assert timezone.parse(metadata["last_log_timestamp"]) >= start_time
 
     @pytest.mark.db_test
-    def test_read_with_patterns(self, ti):
-        ts = pendulum.now()
-        with mock.patch.object(self.os_task_handler, "index_patterns", 
new="test_*,other_*"):
-            logs, metadatas = self.os_task_handler.read(
-                ti, 1, {"offset": 0, "last_log_timestamp": str(ts), 
"end_of_log": False}
-            )
-
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            expected_msg = "Some Message 1"
-            assert logs[0].event == "::group::Log message source details"
-            assert logs[0].sources == ["http://localhost";]
-            assert logs[1].event == "::endgroup::"
-            assert logs[2].event == expected_msg
-            metadata = metadatas
-        else:
-            expected_msg = "Some Message 1\nAnother Some Message 2"
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert len(logs[0]) == 1
-            assert logs[0][0][-1] == expected_msg
-
-            metadata = metadatas[0]
+    def test_read_defaults_offset_when_missing_from_metadata(self, ti):
+        start_time = pendulum.now()
+        with patch.object(self.os_task_handler.io, "_os_read", 
return_value=None):
+            logs, metadatas = self.os_task_handler.read(ti, 1, {"end_of_log": 
False})
 
-        assert not metadata["end_of_log"]
-        assert timezone.parse(metadata["last_log_timestamp"]) > ts
+        metadata = _assert_no_logs(logs, metadatas)
+        assert metadata["end_of_log"]
+        assert metadata["offset"] == "0"
+        assert timezone.parse(metadata["last_log_timestamp"]) >= start_time
 
     @pytest.mark.db_test
-    def test_read_with_patterns_no_match(self, ti):
-        ts = pendulum.now()
-        with mock.patch.object(self.os_task_handler, "index_patterns", 
new="test_other_*,test_another_*"):
-            with mock.patch.object(
-                self.os_task_handler.client,
-                "search",
-                return_value={
-                    "_shards": {"failed": 0, "skipped": 0, "successful": 7, 
"total": 7},
-                    "hits": {"hits": []},
-                    "timed_out": False,
-                    "took": 7,
-                },
-            ):
-                logs, metadatas = self.os_task_handler.read(
-                    ti,
-                    1,
-                    {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": 
False},
-                )
-        if AIRFLOW_V_3_0_PLUS:
-            assert logs == []
+    @pytest.mark.parametrize("seconds", [3, 6])
+    def test_read_missing_logs(self, ti, seconds):
+        start_time = pendulum.now().add(seconds=-seconds)
+        with patch.object(self.os_task_handler.io, "_os_read", 
return_value=None):
+            logs, metadatas = self.os_task_handler.read(
+                ti,
+                1,
+                {"offset": 0, "last_log_timestamp": str(start_time), 
"end_of_log": False},
+            )
 
-            metadata = metadatas
+        metadata = _metadata_from_result(metadatas)
+        if seconds > 5:
+            _assert_missing_log_message(logs)
         else:
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert logs == [[]]
-
-            metadata = metadatas[0]
+            _assert_no_logs(logs, metadatas)
 
-        assert not metadata["end_of_log"]
+        assert metadata["end_of_log"]
         assert metadata["offset"] == "0"
-        # last_log_timestamp won't change if no log lines read.
-        assert timezone.parse(metadata["last_log_timestamp"]) == ts
+        assert timezone.parse(metadata["last_log_timestamp"]) == start_time
 
     @pytest.mark.db_test
-    def test_read_with_missing_index(self, ti):
-        ts = pendulum.now()
-        with mock.patch.object(self.os_task_handler, "index_patterns", 
new="nonexistent,test_*"):
-            with mock.patch.object(
-                self.os_task_handler.client,
-                "count",
-                side_effect=NotFoundError(404, "IndexNotFoundError"),
-            ):
-                with pytest.raises(NotFoundError, match=r"IndexNotFoundError"):
-                    self.os_task_handler.read(
-                        ti,
-                        1,
-                        {
-                            "offset": 0,
-                            "last_log_timestamp": str(ts),
-                            "end_of_log": False,
-                        },
-                    )
+    def test_read_timeout(self, ti):
+        start_time = pendulum.now().subtract(minutes=5)
+        with patch.object(self.os_task_handler.io, "_os_read", 
return_value=None):
+            logs, metadatas = self.os_task_handler.read(
+                ti,
+                1,
+                {"offset": 1, "last_log_timestamp": str(start_time), 
"end_of_log": False},
+            )
 
-    @pytest.mark.parametrize("seconds", [3, 6])
-    @pytest.mark.db_test
-    def test_read_missing_logs(self, seconds, create_task_instance):
-        """
-        When the log actually isn't there to be found, we only want to wait 
for 5 seconds.
-        In this case we expect to receive a message of the form 'Log {log_id} 
not found in Opensearch ...'
-        """
-        ti = get_ti(
-            self.DAG_ID,
-            self.TASK_ID,
-            pendulum.instance(self.LOGICAL_DATE).add(days=1),  # so logs are 
not found
-            create_task_instance=create_task_instance,
-        )
-        ts = pendulum.now().add(seconds=-seconds)
-        with mock.patch.object(
-            self.os_task_handler.client,
-            "search",
-            return_value={
-                "_shards": {"failed": 0, "skipped": 0, "successful": 7, 
"total": 7},
-                "hits": {"hits": []},
-                "timed_out": False,
-                "took": 7,
-            },
-        ):
-            logs, metadatas = self.os_task_handler.read(ti, 1, {"offset": 0, 
"last_log_timestamp": str(ts)})
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            if seconds > 5:
-                # we expect a log not found message when checking began more 
than 5 seconds ago
-                assert len(logs) == 1
-                actual_message = logs[0].event
-                expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*"
-                assert re.match(expected_pattern, actual_message) is not None
-                assert metadatas["end_of_log"] is True
-            else:
-                # we've "waited" less than 5 seconds so it should not be "end 
of log" and should be no log message
-                assert logs == []
-                assert metadatas["end_of_log"] is False
-            assert metadatas["offset"] == "0"
-            assert timezone.parse(metadatas["last_log_timestamp"]) == ts
-        else:
-            assert len(logs) == 1
-            if seconds > 5:
-                # we expect a log not found message when checking began more 
than 5 seconds ago
-                assert len(logs[0]) == 1
-                actual_message = logs[0][0][1]
-                expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*"
-                assert re.match(expected_pattern, actual_message) is not None
-                assert metadatas[0]["end_of_log"] is True
-            else:
-                # we've "waited" less than 5 seconds so it should not be "end 
of log" and should be no log message
-                assert len(logs[0]) == 0
-                assert logs == [[]]
-                assert metadatas[0]["end_of_log"] is False
-            assert len(logs) == len(metadatas)
-            assert metadatas[0]["offset"] == "0"
-            assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts
+        metadata = _assert_no_logs(logs, metadatas)
+        assert metadata["end_of_log"]
+        assert metadata["offset"] == "1"
+        assert timezone.parse(metadata["last_log_timestamp"]) == start_time
 
     @pytest.mark.db_test
-    def test_read_with_none_metadata(self, ti):
-        logs, metadatas = self.os_task_handler.read(ti, 1)
-
-        if AIRFLOW_V_3_0_PLUS:
-            logs = list(logs)
-            expected_message = "Some Message 1"
-            assert logs[0].event == "::group::Log message source details"
-            assert logs[0].sources == ["http://localhost";]
-            assert logs[1].event == "::endgroup::"
-            assert logs[2].event == expected_message
-
-            metadata = metadatas
-        else:
-            expected_message = "Some Message 1\nAnother Some Message 2"
-            assert len(logs) == 1
-            assert len(logs) == len(metadatas)
-            assert len(logs[0]) == 1
-            assert logs[0][0][-1] == expected_message
+    def test_read_with_custom_offset_and_host_fields(self, ti):
+        self.os_task_handler.host_field = "host.name"
+        self.os_task_handler.offset_field = "log.offset"
+        self.os_task_handler.io.host_field = "host.name"
+        self.os_task_handler.io.offset_field = "log.offset"
+        response = _make_os_response(
+            self.os_task_handler.io,
+            {
+                "message": self.test_message,
+                "event": self.test_message,
+                "log_id": self.LOG_ID,
+                "log": {"offset": 1},
+                "host": {"name": "somehostname"},
+            },
+        )
 
-            metadata = metadatas[0]
+        with patch.object(self.os_task_handler.io, "_os_read", 
return_value=response):
+            logs, metadatas = self.os_task_handler.read(
+                ti,
+                1,
+                {"offset": 0, "last_log_timestamp": str(pendulum.now()), 
"end_of_log": False},
+            )
 
+        metadata = _assert_log_events(
+            logs,
+            metadatas,
+            expected_events=[self.test_message],
+            expected_sources=["somehostname"],
+        )
+        assert metadata["offset"] == "1"
         assert not metadata["end_of_log"]
-        assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now()
 
     @pytest.mark.db_test
     def test_set_context(self, ti):
@@ -388,27 +386,29 @@ class TestOpensearchTaskHandler:
 
     @pytest.mark.db_test
     def test_set_context_w_json_format_and_write_stdout(self, ti):
-        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s 
- %(message)s")
-        self.os_task_handler.formatter = formatter
+        self.os_task_handler.formatter = logging.Formatter(
+            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+        )
         self.os_task_handler.write_stdout = True
         self.os_task_handler.json_format = True
+
         self.os_task_handler.set_context(ti)
 
+        assert isinstance(self.os_task_handler.formatter, 
OpensearchJSONFormatter)
+        assert isinstance(self.os_task_handler.handler, logging.StreamHandler)
+        assert self.os_task_handler.context_set
+
     @pytest.mark.db_test
     def test_close(self, ti):
-        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s 
- %(message)s")
-        self.os_task_handler.formatter = formatter
+        self.os_task_handler.formatter = logging.Formatter(
+            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+        )
 
         self.os_task_handler.set_context(ti)
         self.os_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            # end_of_log_mark may contain characters like '\n' which is needed 
to
-            # have the log uploaded but will not be stored in elasticsearch.
-            # so apply the strip() to log_file.read()
-            log_line = log_file.read().strip()
-            assert log_line.endswith(self.end_of_log_mark.strip())
+
+        log_file = Path(self.local_log_location) / 
self.FILENAME_TEMPLATE.format(try_number=1)
+        assert 
log_file.read_text().strip().endswith(self.end_of_log_mark.strip())
         assert self.os_task_handler.closed
 
     @pytest.mark.db_test
@@ -416,51 +416,34 @@ class TestOpensearchTaskHandler:
         ti.raw = True
         self.os_task_handler.set_context(ti)
         self.os_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert self.end_of_log_mark not in log_file.read()
-        assert self.os_task_handler.closed
 
-    @pytest.mark.db_test
-    def test_close_closed(self, ti):
-        self.os_task_handler.closed = True
-        self.os_task_handler.set_context(ti)
-        self.os_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert len(log_file.read()) == 0
+        log_file = Path(self.local_log_location) / 
self.FILENAME_TEMPLATE.format(try_number=1)
+        assert self.end_of_log_mark not in log_file.read_text()
+        assert self.os_task_handler.closed
 
     @pytest.mark.db_test
     def test_close_with_no_handler(self, ti):
         self.os_task_handler.set_context(ti)
         self.os_task_handler.handler = None
         self.os_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert len(log_file.read()) == 0
+
+        log_file = Path(self.local_log_location) / 
self.FILENAME_TEMPLATE.format(try_number=1)
+        assert log_file.read_text() == ""
         assert self.os_task_handler.closed
 
     @pytest.mark.db_test
-    def test_close_with_no_stream(self, ti):
+    @pytest.mark.parametrize("stream_state", ["none", "closed"])
+    def test_close_reopens_stream(self, ti, stream_state):
         self.os_task_handler.set_context(ti)
-        self.os_task_handler.handler.stream = None
-        self.os_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert self.end_of_log_mark in log_file.read()
-        assert self.os_task_handler.closed
+        if stream_state == "none":
+            self.os_task_handler.handler.stream = None
+        else:
+            self.os_task_handler.handler.stream.close()
 
-        self.os_task_handler.set_context(ti)
-        self.os_task_handler.handler.stream.close()
         self.os_task_handler.close()
-        with open(
-            os.path.join(self.local_log_location, 
self.FILENAME_TEMPLATE.format(try_number=1))
-        ) as log_file:
-            assert self.end_of_log_mark in log_file.read()
+
+        log_file = Path(self.local_log_location) / 
self.FILENAME_TEMPLATE.format(try_number=1)
+        assert self.end_of_log_mark in log_file.read_text()
         assert self.os_task_handler.closed
 
     @pytest.mark.db_test
@@ -471,21 +454,20 @@ class TestOpensearchTaskHandler:
         assert self.os_task_handler._render_log_id(ti, 1) == self.JSON_LOG_ID
 
     def test_clean_date(self):
-        clean_execution_date = self.os_task_handler._clean_date(datetime(2016, 
7, 8, 9, 10, 11, 12))
-        assert clean_execution_date == "2016_07_08T09_10_11_000012"
+        clean_logical_date = OpensearchTaskHandler._clean_date(datetime(2016, 
7, 8, 9, 10, 11, 12))
+        assert clean_logical_date == "2016_07_08T09_10_11_000012"
 
-    @mock.patch("sys.__stdout__", new_callable=StringIO)
     @pytest.mark.db_test
+    @patch("sys.__stdout__", new_callable=StringIO)
     def test_dynamic_offset(self, stdout_mock, ti, time_machine):
-        # arrange
         handler = OpensearchTaskHandler(
             base_log_folder=self.local_log_location,
             end_of_log_mark=self.end_of_log_mark,
             write_stdout=True,
-            host=self.host,
-            port=self.port,
-            username=self.username,
-            password=self.password,
+            host="localhost",
+            port=9200,
+            username="admin",
+            password="admin",
             json_format=True,
             json_fields=self.json_fields,
             host_field=self.host_field,
@@ -493,7 +475,7 @@ class TestOpensearchTaskHandler:
         )
         handler.formatter = logging.Formatter()
 
-        logger = logging.getLogger(__name__)
+        logger = logging.getLogger("tests.opensearch.dynamic_offset")
         logger.handlers = [handler]
         logger.propagate = False
 
@@ -501,17 +483,19 @@ class TestOpensearchTaskHandler:
         handler.set_context(ti)
 
         t1 = pendulum.local(year=2017, month=1, day=1, hour=1, minute=1, 
second=15)
-        t2, t3 = t1 + pendulum.duration(seconds=5), t1 + 
pendulum.duration(seconds=10)
-
-        # act
-        time_machine.move_to(t1, tick=False)
-        ti.log.info("Test")
-        time_machine.move_to(t2, tick=False)
-        ti.log.info("Test2")
-        time_machine.move_to(t3, tick=False)
-        ti.log.info("Test3")
+        t2 = t1 + pendulum.duration(seconds=5)
+        t3 = t1 + pendulum.duration(seconds=10)
+
+        try:
+            time_machine.move_to(t1, tick=False)
+            ti.log.info("Test")
+            time_machine.move_to(t2, tick=False)
+            ti.log.info("Test2")
+            time_machine.move_to(t3, tick=False)
+            ti.log.info("Test3")
+        finally:
+            logger.handlers = []
 
-        # assert
         first_log, second_log, third_log = map(json.loads, 
stdout_mock.getvalue().strip().splitlines())
         assert first_log["offset"] < second_log["offset"] < third_log["offset"]
         assert first_log["asctime"] == t1.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
@@ -523,81 +507,212 @@ class TestOpensearchTaskHandler:
             mock_callable = Mock(return_value="callable_index_pattern")
             mock_import_string.return_value = mock_callable
 
-            self.os_task_handler.index_patterns_callable = 
"path.to.index_pattern_callable"
-            result = self.os_task_handler._get_index_patterns({})
-
-            
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
-            mock_callable.assert_called_once_with({})
-            assert result == "callable_index_pattern"
-
-
-def test_safe_attrgetter():
-    class A: ...
-
-    a = A()
-    a.b = "b"
-    a.c = None
-    a.x = a
-    a.x.d = "blah"
-    assert getattr_nested(a, "b", None) == "b"  # regular getattr
-    assert getattr_nested(a, "x.d", None) == "blah"  # nested val
-    assert getattr_nested(a, "aa", "heya") == "heya"  # respects non-none 
default
-    assert getattr_nested(a, "c", "heya") is None  # respects none value
-    assert getattr_nested(a, "aa", None) is None  # respects none default
-
-
-def test_retrieve_config_keys():
-    """
-    Tests that the OpensearchTaskHandler retrieves the correct configuration 
keys from the config file.
-    * old_parameters are removed
-    * parameters from config are automatically added
-    * constructor parameters missing from config are also added
-    :return:
-    """
-    with conf_vars(
-        {
-            ("opensearch_configs", "http_compress"): "False",
-            ("opensearch_configs", "timeout"): "10",
+            self.os_task_handler.io.index_patterns_callable = 
"path.to.index_pattern_callable"
+            result = self.os_task_handler.io._get_index_patterns({})
+
+        
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
+        mock_callable.assert_called_once_with({})
+        assert result == "callable_index_pattern"
+
+
+class TestTaskHandlerHelpers:
+    def test_safe_attrgetter(self):
+        class A: ...
+
+        a = A()
+        a.b = "b"
+        a.c = None
+        a.x = a
+        a.x.d = "blah"
+        assert getattr_nested(a, "b", None) == "b"
+        assert getattr_nested(a, "x.d", None) == "blah"
+        assert getattr_nested(a, "aa", "heya") == "heya"
+        assert getattr_nested(a, "c", "heya") is None
+        assert getattr_nested(a, "aa", None) is None
+
+    def test_retrieve_config_keys(self):
+        with conf_vars(
+            {
+                ("opensearch_configs", "http_compress"): "False",
+                ("opensearch_configs", "timeout"): "10",
+            }
+        ):
+            args_from_config = get_os_kwargs_from_config().keys()
+            assert "verify_certs" in args_from_config
+            assert "timeout" in args_from_config
+            assert "http_compress" in args_from_config
+            assert "self" not in args_from_config
+
+
+class TestOpensearchRemoteLogIO:
+    @pytest.fixture(autouse=True)
+    def _setup_tests(self, tmp_path):
+        self.opensearch_io = OpensearchRemoteLogIO(
+            write_to_opensearch=True,
+            write_stdout=True,
+            delete_local_copy=True,
+            host="localhost",
+            port=9200,
+            username="admin",
+            password="admin",
+            base_log_folder=tmp_path,
+            
log_id_template="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
+        )
+
+    @pytest.fixture
+    def ti(self):
+        return _MockTI()
+
+    @pytest.fixture
+    def tmp_json_file(self, tmp_path):
+        file_path = tmp_path / "1.log"
+        sample_logs = [
+            {"message": "start"},
+            {"message": "processing"},
+            {"message": "end"},
+        ]
+        file_path.write_text("\n".join(json.dumps(log) for log in sample_logs) 
+ "\n")
+        return file_path
+
+    def test_write_to_stdout(self, tmp_json_file, ti, capsys):
+        self.opensearch_io.write_to_opensearch = False
+        self.opensearch_io.upload(tmp_json_file, ti)
+
+        captured = capsys.readouterr()
+        stdout_lines = captured.out.strip().splitlines()
+        log_entries = [json.loads(line) for line in stdout_lines]
+        assert [entry["message"] for entry in log_entries] == ["start", 
"processing", "end"]
+
+    def test_invalid_task_log_file_path(self, ti):
+        with (
+            patch.object(self.opensearch_io, "_parse_raw_log") as mock_parse,
+            patch.object(self.opensearch_io, "_write_to_opensearch") as 
mock_write,
+        ):
+            self.opensearch_io.upload(Path("/invalid/path"), ti)
+
+        mock_parse.assert_not_called()
+        mock_write.assert_not_called()
+
+    def test_write_to_opensearch(self, tmp_json_file, ti):
+        self.opensearch_io.write_stdout = False
+        log_id = _render_log_id(self.opensearch_io.log_id_template, ti, 
ti.try_number)
+        expected_log_lines = 
self.opensearch_io._parse_raw_log(tmp_json_file.read_text(), log_id)
+
+        with patch.object(self.opensearch_io, "_write_to_opensearch", 
return_value=True) as mock_write:
+            self.opensearch_io.upload(tmp_json_file, ti)
+
+        mock_write.assert_called_once_with(expected_log_lines)
+
+    def test_raw_log_contains_log_id_and_offset(self, tmp_json_file, ti):
+        raw_log = tmp_json_file.read_text()
+        log_id = _render_log_id(self.opensearch_io.log_id_template, ti, 
ti.try_number)
+        json_log_lines = self.opensearch_io._parse_raw_log(raw_log, log_id)
+
+        assert len(json_log_lines) == 3
+        assert [line["offset"] for line in json_log_lines] == [1, 2, 3]
+        assert all(line["log_id"] == log_id for line in json_log_lines)
+
+    def test_os_read_builds_expected_query(self, ti):
+        self.opensearch_io.client = Mock()
+        self.opensearch_io.client.count.return_value = {"count": 1}
+        self.opensearch_io.client.search.return_value = 
_build_os_search_response(
+            {
+                "event": "hello",
+                "log_id": _render_log_id(self.opensearch_io.log_id_template, 
ti, ti.try_number),
+                "offset": 3,
+            }
+        )
+        self.opensearch_io.index_patterns = "airflow-logs-*"
+        log_id = _render_log_id(self.opensearch_io.log_id_template, ti, 
ti.try_number)
+        query = {
+            "query": {
+                "bool": {
+                    "filter": [{"range": {self.opensearch_io.offset_field: 
{"gt": 2}}}],
+                    "must": [{"match_phrase": {"log_id": log_id}}],
+                }
+            }
         }
-    ):
-        args_from_config = get_os_kwargs_from_config().keys()
-        # verify_certs comes from default config value
-        assert "verify_certs" in args_from_config
-        # timeout comes from config provided value
-        assert "timeout" in args_from_config
-        # http_compress comes from config value
-        assert "http_compress" in args_from_config
-        assert "self" not in args_from_config
 
+        response = self.opensearch_io._os_read(log_id, 2, ti)
 
-# ---------------------------------------------------------------------------
-# Tests for the error_detail helpers (issue #63736)
-# ---------------------------------------------------------------------------
+        
self.opensearch_io.client.count.assert_called_once_with(index="airflow-logs-*", 
body=query)
+        self.opensearch_io.client.search.assert_called_once_with(
+            index="airflow-logs-*",
+            body=query,
+            sort=[self.opensearch_io.offset_field],
+            size=self.opensearch_io.MAX_LINE_PER_PAGE,
+            from_=0,
+        )
+        assert response is not None
+        assert response.hits[0].event == "hello"
 
+    def test_os_read_returns_none_when_count_is_zero(self, ti):
+        self.opensearch_io.client = Mock()
+        self.opensearch_io.client.count.return_value = {"count": 0}
 
-class TestFormatErrorDetail:
-    """Unit tests for _format_error_detail."""
+        log_id = _render_log_id(self.opensearch_io.log_id_template, ti, 
ti.try_number)
+        response = self.opensearch_io._os_read(log_id, 0, ti)
 
-    def test_returns_none_for_empty(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
+        assert response is None
+        self.opensearch_io.client.search.assert_not_called()
+
+    def test_os_read_propagates_missing_index(self, ti):
+        self.opensearch_io.client = Mock()
+        self.opensearch_io.client.count.side_effect = NotFoundError(
+            404,
+            "IndexMissingException[[missing] missing]",
+        )
+
+        log_id = _render_log_id(self.opensearch_io.log_id_template, ti, 
ti.try_number)
+        with pytest.raises(NotFoundError):
+            self.opensearch_io._os_read(log_id, 0, ti)
+
+    def test_os_read_logs_and_returns_none_on_search_error(self, ti):
+        self.opensearch_io.client = Mock()
+        self.opensearch_io.client.count.return_value = {"count": 1}
+        self.opensearch_io.client.search.side_effect = RuntimeError("boom")
+
+        log_id = _render_log_id(self.opensearch_io.log_id_template, ti, 
ti.try_number)
+        with patch.object(self.opensearch_io.log, "exception") as 
mock_exception:
+            response = self.opensearch_io._os_read(log_id, 0, ti)
+
+        assert response is None
+        mock_exception.assert_called_once()
 
+    def test_read_returns_missing_log_message_when_os_read_returns_none(self, 
ti):
+        with patch.object(self.opensearch_io, "_os_read", return_value=None):
+            log_source_info, log_messages = self.opensearch_io.read("", ti)
+
+        log_id = _render_log_id(self.opensearch_io.log_id_template, ti, 
ti.try_number)
+        assert log_source_info == []
+        assert f"*** Log {log_id} not found in Opensearch" in log_messages[0]
+
+    def test_get_index_patterns_with_callable(self):
+        with 
patch("airflow.providers.opensearch.log.os_task_handler.import_string") as 
mock_import_string:
+            mock_callable = Mock(return_value="callable_index_pattern")
+            mock_import_string.return_value = mock_callable
+
+            self.opensearch_io.index_patterns_callable = 
"path.to.index_pattern_callable"
+            result = self.opensearch_io._get_index_patterns({})
+
+        
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
+        mock_callable.assert_called_once_with({})
+        assert result == "callable_index_pattern"
+
+
+class TestFormatErrorDetail:
+    def test_returns_none_for_empty(self):
         assert _format_error_detail(None) is None
         assert _format_error_detail([]) is None
 
     def test_returns_string_for_non_list(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
-
         assert _format_error_detail("raw string") == "raw string"
 
     def test_formats_single_exception(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
-
         error_detail = [
             {
                 "is_cause": False,
-                "frames": [
-                    {"filename": "/app/task.py", "lineno": 13, "name": 
"log_and_raise"},
-                ],
+                "frames": [{"filename": "/app/task.py", "lineno": 13, "name": 
"log_and_raise"}],
                 "exc_type": "RuntimeError",
                 "exc_value": "Something went wrong.",
                 "exceptions": [],
@@ -611,8 +726,6 @@ class TestFormatErrorDetail:
         assert "RuntimeError: Something went wrong." in result
 
     def test_formats_chained_exceptions(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
-
         error_detail = [
             {
                 "is_cause": True,
@@ -636,8 +749,6 @@ class TestFormatErrorDetail:
         assert "RuntimeError: wrapped" in result
 
     def test_exc_type_without_value(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
-
         error_detail = [
             {
                 "is_cause": False,
@@ -651,19 +762,13 @@ class TestFormatErrorDetail:
         assert result.endswith("StopIteration")
 
     def test_non_dict_items_are_stringified(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
-
         result = _format_error_detail(["unexpected string item"])
         assert result is not None
         assert "unexpected string item" in result
 
 
-class TestBuildLogFields:
-    """Unit tests for _build_log_fields."""
-
+class TestBuildStructuredLogFields:
     def test_filters_to_allowed_fields(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
-
         hit = {"event": "hello", "level": "info", "unknown_field": "should be 
dropped"}
         result = _build_log_fields(hit)
         assert "event" in result
@@ -671,41 +776,30 @@ class TestBuildLogFields:
         assert "unknown_field" not in result
 
     def test_message_mapped_to_event(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
-
         hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
         fields = _build_log_fields(hit)
         assert fields["event"] == "plain message"
-        assert "message" not in fields  # Ensure it is popped if used as event
+        assert "message" not in fields
 
     def test_message_preserved_if_event_exists(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
-
         hit = {"event": "structured event", "message": "plain message"}
         fields = _build_log_fields(hit)
         assert fields["event"] == "structured event"
-        # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide 
with event
         assert fields["message"] == "plain message"
 
     def test_levelname_mapped_to_level(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
-
         hit = {"event": "msg", "levelname": "ERROR"}
         result = _build_log_fields(hit)
         assert result["level"] == "ERROR"
         assert "levelname" not in result
 
     def test_at_timestamp_mapped_to_timestamp(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
-
         hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
         result = _build_log_fields(hit)
         assert result["timestamp"] == "2024-01-01T00:00:00Z"
         assert "@timestamp" not in result
 
     def test_error_detail_is_kept_as_list(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
-
         error_detail = [
             {
                 "is_cause": False,
@@ -714,74 +808,11 @@ class TestBuildLogFields:
                 "exc_value": "Woopsie.",
             }
         ]
-        hit = {
-            "event": "Task failed with exception",
-            "error_detail": error_detail,
-        }
+        hit = {"event": "Task failed with exception", "error_detail": 
error_detail}
         result = _build_log_fields(hit)
         assert result["error_detail"] == error_detail
 
     def test_error_detail_dropped_when_empty(self):
-        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
-
         hit = {"event": "msg", "error_detail": []}
         result = _build_log_fields(hit)
         assert "error_detail" not in result
-
-    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage 
only exists in Airflow 3+")
-    def test_read_includes_error_detail_in_structured_message(self):
-        """End-to-end: a hit with error_detail should surface it in the 
returned StructuredLogMessage."""
-        from airflow.providers.opensearch.log.os_task_handler import 
OpensearchTaskHandler
-
-        local_log_location = "local/log/location"
-        handler = OpensearchTaskHandler(
-            base_log_folder=local_log_location,
-            end_of_log_mark="end_of_log\n",
-            write_stdout=False,
-            json_format=False,
-            json_fields="asctime,filename,lineno,levelname,message,exc_text",
-            host="localhost",
-            port=9200,
-            username="admin",
-            password="password",
-        )
-
-        log_id = "test_dag-test_task-test_run--1-1"
-        body = {
-            "event": "Task failed with exception",
-            "log_id": log_id,
-            "offset": 1,
-            "error_detail": [
-                {
-                    "is_cause": False,
-                    "frames": [
-                        {"filename": "/opt/airflow/dags/fail.py", "lineno": 
13, "name": "log_and_raise"}
-                    ],
-                    "exc_type": "RuntimeError",
-                    "exc_value": "Woopsie. Something went wrong.",
-                }
-            ],
-        }
-
-        # Instead of firing up an OpenSearch client, we patch the IO and 
response class
-        mock_hit_dict = body.copy()
-        from airflow.providers.opensearch.log.os_response import Hit, 
OpensearchResponse
-
-        mock_hit = Hit({"_source": mock_hit_dict})
-        mock_response = mock.MagicMock(spec=OpensearchResponse)
-        mock_response.hits = [mock_hit]
-        mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
-        mock_response.__bool__ = mock.Mock(return_value=True)
-        mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
-
-        with mock.patch.object(handler, "_os_read", 
return_value=mock_response):
-            with mock.patch.object(handler, "_group_logs_by_host", 
return_value={"localhost": [mock_hit]}):
-                from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
-                from airflow.utils.log.file_task_handler import 
StructuredLogMessage
-
-                fields = _build_log_fields(mock_hit.to_dict())
-                msg = StructuredLogMessage(**fields)
-
-                assert msg.event == "Task failed with exception"
-                assert hasattr(msg, "error_detail")
-                assert msg.error_detail == body["error_detail"]


Reply via email to