This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4b0e876b470 feat: add `write_to_os` writing task logs to opensearch
(#64364)
4b0e876b470 is described below
commit 4b0e876b470fcf47933f26898db2bf554c832ab1
Author: Owen Leung <[email protected]>
AuthorDate: Tue Apr 7 21:15:02 2026 +0800
feat: add `write_to_os` writing task logs to opensearch (#64364)
* feat: restore opensearch provider & add write_to_os feature
* test: fix failing CI
* test: revert test info command
* revise for copilot comments
* revert core changes
* fix breeze unit test
* Update providers/opensearch/provider.yaml
* Update providers/opensearch/provider.yaml
* update pyproejct toml
* fix failing ci
---------
Co-authored-by: Elad Kalif <[email protected]>
---
dev/breeze/src/airflow_breeze/global_constants.py | 2 +-
.../tests/test_pytest_args_for_test_types.py | 1 +
providers/opensearch/docs/index.rst | 4 +-
providers/opensearch/docs/logging/index.rst | 22 +
providers/opensearch/provider.yaml | 14 +
providers/opensearch/pyproject.toml | 2 +-
.../src/airflow/providers/opensearch/__init__.py | 4 +-
.../providers/opensearch/get_provider_info.py | 14 +
.../providers/opensearch/log/os_task_handler.py | 399 +++++++--
.../airflow/providers/opensearch/version_compat.py | 2 +
.../opensearch => tests/integration}/__init__.py | 24 +-
.../integration}/opensearch/__init__.py | 23 -
.../integration/opensearch/log}/__init__.py | 23 -
.../opensearch/log/test_os_remote_log_io.py | 140 ++++
.../opensearch/tests/unit/opensearch/conftest.py | 49 --
.../unit/opensearch/log/test_os_task_handler.py | 913 +++++++++++----------
16 files changed, 1011 insertions(+), 625 deletions(-)
diff --git a/dev/breeze/src/airflow_breeze/global_constants.py
b/dev/breeze/src/airflow_breeze/global_constants.py
index efba238c4de..d7af8a84b14 100644
--- a/dev/breeze/src/airflow_breeze/global_constants.py
+++ b/dev/breeze/src/airflow_breeze/global_constants.py
@@ -767,7 +767,7 @@ PROVIDERS_COMPATIBILITY_TESTS_MATRIX: list[dict[str, str |
list[str]]] = [
{
"python-version": "3.10",
"airflow-version": "2.11.1",
- "remove-providers": "common.messaging edge3 fab git keycloak
informatica common.ai",
+ "remove-providers": "common.messaging edge3 fab git keycloak
informatica common.ai opensearch",
"run-unit-tests": "true",
},
{
diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py
b/dev/breeze/tests/test_pytest_args_for_test_types.py
index 91a2a54b328..423c96eb10f 100644
--- a/dev/breeze/tests/test_pytest_args_for_test_types.py
+++ b/dev/breeze/tests/test_pytest_args_for_test_types.py
@@ -70,6 +70,7 @@ def _find_all_integration_folders() -> list[str]:
"providers/microsoft/mssql/tests/integration",
"providers/mongo/tests/integration",
"providers/openlineage/tests/integration",
+ "providers/opensearch/tests/integration",
"providers/qdrant/tests/integration",
"providers/redis/tests/integration",
"providers/trino/tests/integration",
diff --git a/providers/opensearch/docs/index.rst
b/providers/opensearch/docs/index.rst
index ad55afc5f52..c339ac6c296 100644
--- a/providers/opensearch/docs/index.rst
+++ b/providers/opensearch/docs/index.rst
@@ -96,12 +96,12 @@ For the minimum Airflow version supported, see
``Requirements`` below.
Requirements
------------
-The minimum Apache Airflow version supported by this provider distribution is
``2.11.0``.
+The minimum Apache Airflow version supported by this provider distribution is
``3.0.0``.
========================================== ==================
PIP package Version required
========================================== ==================
-``apache-airflow`` ``>=2.11.0``
+``apache-airflow`` ``>=3.0.0``
``apache-airflow-providers-common-compat`` ``>=1.12.0``
``opensearch-py`` ``>=2.2.0``
========================================== ==================
diff --git a/providers/opensearch/docs/logging/index.rst
b/providers/opensearch/docs/logging/index.rst
index 9e27d8bd66a..22c0314223b 100644
--- a/providers/opensearch/docs/logging/index.rst
+++ b/providers/opensearch/docs/logging/index.rst
@@ -25,6 +25,8 @@ Available only with Airflow>=3.0
Airflow can be configured to read task logs from Opensearch and optionally
write logs to stdout in standard or json format. These logs can later be
collected and forwarded to the cluster using tools like fluentd, logstash or
others.
+Airflow also supports writing logs to OpenSearch directly without requiring
additional software like fluentd or logstash. To enable this feature, set
``write_to_os`` and ``json_format`` to ``True`` and ``write_stdout`` to
``False`` in ``airflow.cfg``.
+
You can choose to have all task logs from workers output to the highest parent
level process, instead of the standard file locations. This allows for some
additional flexibility in container environments like Kubernetes, where
container stdout is already being logged to the host nodes. From there a log
shipping tool can be used to forward them along to Opensearch. To use this
feature, set the ``write_stdout`` option in ``airflow.cfg``.
You can also choose to have the logs output in a JSON format, using the
``json_format`` option. Airflow uses the standard Python logging module and
JSON fields are directly extracted from the LogRecord object. To use this
feature, set the ``json_fields`` option in ``airflow.cfg``. Add the fields to
the comma-delimited string that you want collected for the logs. These fields
are from the LogRecord object in the ``logging`` module. `Documentation on
different attributes can be found here [...]
@@ -52,6 +54,24 @@ To output task logs to stdout in JSON format, the following
config could be used
write_stdout = True
json_format = True
+To output task logs to OpenSearch directly, the following config could be
used: (set ``delete_local_logs`` to ``True`` if you do not want to retain a
local copy of the task log)
+
+.. code-block:: ini
+
+ [logging]
+ remote_logging = True
+ delete_local_logs = False
+
+ [opensearch]
+ host = <host>
+ port = <port>
+ username = <username>
+ password = <password>
+ write_stdout = False
+ json_format = True
+ write_to_os = True
+ target_index = [name of the index to store logs]
+
.. _write-logs-elasticsearch-tls:
Writing logs to Opensearch over TLS
@@ -60,6 +80,8 @@ Writing logs to Opensearch over TLS
To add custom configurations to Opensearch (e.g. turning on ``ssl_verify``,
adding a custom self-signed
cert, etc.) use the ``opensearch_configs`` setting in your ``airflow.cfg``
+Note that these configurations also apply when you enable writing logs to
OpenSearch directly.
+
.. code-block:: ini
[logging]
diff --git a/providers/opensearch/provider.yaml
b/providers/opensearch/provider.yaml
index 26b9c57d8d2..84e71e41813 100644
--- a/providers/opensearch/provider.yaml
+++ b/providers/opensearch/provider.yaml
@@ -144,6 +144,20 @@ config:
type: string
example: ~
default: "False"
+ write_to_os:
+ description: |
+ Write the task logs directly to OpenSearch
+ version_added: 1.9.0
+ type: string
+ example: ~
+ default: "False"
+ target_index:
+ description: |
+ Name of the index to write to when direct OpenSearch log writing is
enabled
+ version_added: 1.9.0
+ type: string
+ example: ~
+ default: "airflow-logs"
json_format:
description: |
Instead of the default log formatter, write the log lines as JSON
diff --git a/providers/opensearch/pyproject.toml
b/providers/opensearch/pyproject.toml
index 22f10ad2036..b110dc36cfa 100644
--- a/providers/opensearch/pyproject.toml
+++ b/providers/opensearch/pyproject.toml
@@ -59,7 +59,7 @@ requires-python = ">=3.10"
# Make sure to run ``prek update-providers-dependencies --all-files``
# After you modify the dependencies, and rebuild your Breeze CI image with
``breeze ci-image build``
dependencies = [
- "apache-airflow>=2.11.0",
+ "apache-airflow>=3.0.0",
"apache-airflow-providers-common-compat>=1.12.0",
"opensearch-py>=2.2.0",
]
diff --git a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
b/providers/opensearch/src/airflow/providers/opensearch/__init__.py
index 275df0392b4..d52ce569a16 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/__init__.py
@@ -32,8 +32,8 @@ __all__ = ["__version__"]
__version__ = "1.8.5"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
- "2.11.0"
+ "3.0.0"
):
raise RuntimeError(
- f"The package `apache-airflow-providers-opensearch:{__version__}`
needs Apache Airflow 2.11.0+"
+ f"The package `apache-airflow-providers-opensearch:{__version__}`
needs Apache Airflow 3.0.0+"
)
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/get_provider_info.py
b/providers/opensearch/src/airflow/providers/opensearch/get_provider_info.py
index 99e439416d3..70a3fac8ec2 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/get_provider_info.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/get_provider_info.py
@@ -116,6 +116,20 @@ def get_provider_info():
"example": None,
"default": "False",
},
+ "write_to_os": {
+ "description": "Write the task logs directly to
OpenSearch\n",
+ "version_added": "1.9.0",
+ "type": "string",
+ "example": None,
+ "default": "False",
+ },
+ "target_index": {
+ "description": "Name of the index to write to when
direct OpenSearch log writing is enabled\n",
+ "version_added": "1.9.0",
+ "type": "string",
+ "example": None,
+ "default": "airflow-logs",
+ },
"json_format": {
"description": "Instead of the default log formatter,
write the log lines as JSON\n",
"version_added": "1.5.0",
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
index 05f0ff90cbf..300739b52e5 100644
---
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
+++
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
@@ -18,27 +18,32 @@
from __future__ import annotations
import contextlib
+import json
import logging
+import os
import sys
import time
from collections import defaultdict
from collections.abc import Callable
from datetime import datetime
from operator import attrgetter
+from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, cast
from urllib.parse import urlparse
+import attrs
import pendulum
-from opensearchpy import OpenSearch
+from opensearchpy import OpenSearch, helpers
from opensearchpy.exceptions import NotFoundError
from sqlalchemy import select
+import airflow.logging_config as alc
from airflow.models import DagRun
from airflow.providers.common.compat.module_loading import import_string
from airflow.providers.common.compat.sdk import AirflowException, conf
from airflow.providers.opensearch.log.os_json_formatter import
OpensearchJSONFormatter
from airflow.providers.opensearch.log.os_response import Hit,
OpensearchResponse
-from airflow.providers.opensearch.version_compat import AIRFLOW_V_3_0_PLUS
+from airflow.providers.opensearch.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
from airflow.utils import timezone
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
@@ -46,7 +51,8 @@ from airflow.utils.session import create_session
if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
- from airflow.utils.log.file_task_handler import LogMetadata
+ from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI
+ from airflow.utils.log.file_task_handler import LogMessages, LogMetadata,
LogSourceInfo
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.log.file_task_handler import StructuredLogMessage
@@ -154,16 +160,88 @@ def get_os_kwargs_from_config() -> dict[str, Any]:
return kwargs_dict
+def _format_url(host: str) -> str:
+ """
+ Format the given host string to ensure it starts with 'http' and check if
it represents a valid URL.
+
+ :params host: The host string to format and check.
+ """
+ parsed_url = urlparse(host)
+
+ if parsed_url.scheme not in ("http", "https"):
+ host = "http://" + host
+ parsed_url = urlparse(host)
+
+ if not parsed_url.netloc:
+ raise ValueError(f"'{host}' is not a valid URL.")
+
+ return host
+
+
+def _create_opensearch_client(
+ host: str,
+ port: int | None,
+ username: str,
+ password: str,
+ os_kwargs: dict[str, Any],
+) -> OpenSearch:
+ parsed_url = urlparse(_format_url(host))
+ resolved_port = port if port is not None else (parsed_url.port or 9200)
+ return OpenSearch(
+ hosts=[{"host": parsed_url.hostname, "port": resolved_port, "scheme":
parsed_url.scheme}],
+ http_auth=(username, password),
+ **os_kwargs,
+ )
+
+
+def _render_log_id(
+ log_id_template: str, ti: TaskInstance | TaskInstanceKey | RuntimeTI,
try_number: int
+) -> str:
+ return log_id_template.format(
+ dag_id=ti.dag_id,
+ task_id=ti.task_id,
+ run_id=getattr(ti, "run_id", ""),
+ try_number=try_number,
+ map_index=getattr(ti, "map_index", ""),
+ )
+
+
+def _resolve_nested(hit: dict[Any, Any], parent_class=None) -> type[Hit]:
+ """
+ Resolve nested hits from OpenSearch by iteratively navigating the
`_nested` field.
+
+ The result is used to fetch the appropriate document class to handle the
hit.
+ """
+ doc_class = Hit
+ nested_field = None
+
+ nested_path: list[str] = []
+ nesting = hit["_nested"]
+ while nesting and "field" in nesting:
+ nested_path.append(nesting["field"])
+ nesting = nesting.get("_nested")
+ nested_path_str = ".".join(nested_path)
+
+ if hasattr(parent_class, "_index"):
+ nested_field = parent_class._index.resolve_field(nested_path_str)
+
+ if nested_field is not None:
+ return nested_field._doc_class
+
+ return doc_class
+
+
class OpensearchTaskHandler(FileTaskHandler, ExternalLoggingMixin,
LoggingMixin):
"""
- OpensearchTaskHandler is a Python log handler that reads and writes logs
to OpenSearch.
+ OpensearchTaskHandler is a Python log handler that reads logs from
OpenSearch.
- Like the ElasticsearchTaskHandler, Airflow itself does not handle the
indexing of logs.
- Instead, logs are flushed to local files, and additional software (e.g.,
Filebeat, Logstash)
- may be required to ship logs to OpenSearch. This handler then enables
fetching and displaying
- logs from OpenSearch.
+ Airflow flushes task logs to local files. Additional software setup can
then ship those
+ logs to OpenSearch. On Airflow 3, this task handler also registers a
matching
+ ``OpensearchRemoteLogIO`` so the new remote logging path can read from
OpenSearch too.
+ Airflow can also be configured to write task logs to OpenSearch directly.
To enable this
+ feature, set ``json_format`` and ``write_to_opensearch`` to ``True``.
- To efficiently query and sort Elasticsearch results, this handler assumes
each
+ To efficiently query and sort OpenSearch results, this handler assumes each
log message has a field `log_id` consists of ti primary keys:
`log_id = {dag_id}-{task_id}-{logical_date}-{try_number}`
Log messages with specific log_id are sorted based on `offset`,
@@ -180,6 +258,8 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
:param port: OpenSearch port.
:param username: Username for OpenSearch authentication.
:param password: Password for OpenSearch authentication.
+ :param write_to_opensearch: Whether to write logs directly to OpenSearch.
+ :param target_index: Name of the index to write to when direct OpenSearch
writes are enabled.
:param host_field: The field name for the host in the logs (default is
"host").
:param offset_field: The field name for the log offset (default is
"offset").
:param index_patterns: Index pattern or template for storing logs.
@@ -202,17 +282,22 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
json_format: bool,
json_fields: str,
host: str,
- port: int,
+ port: int | None,
username: str,
password: str,
+ write_to_opensearch: bool = False,
+ target_index: str = "airflow-logs",
host_field: str = "host",
offset_field: str = "offset",
index_patterns: str = conf.get("opensearch", "index_patterns",
fallback="_all"),
index_patterns_callable: str = conf.get("opensearch",
"index_patterns_callable", fallback=""),
+ log_id_template: str = conf.get("opensearch", "log_id_template",
fallback="")
+ or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
os_kwargs: dict | None | Literal["default_os_kwargs"] =
"default_os_kwargs",
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
+ **kwargs,
) -> None:
os_kwargs = os_kwargs or {}
if os_kwargs == "default_os_kwargs":
@@ -225,23 +310,54 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark.strip()
self.write_stdout = write_stdout
+ self.write_to_opensearch = write_to_opensearch
self.json_format = json_format
self.json_fields = [label.strip() for label in json_fields.split(",")]
self.host = self.format_url(host)
self.host_field = host_field
self.offset_field = offset_field
+ self.target_index = target_index
self.index_patterns = index_patterns
self.index_patterns_callable = index_patterns_callable
self.context_set = False
- self.client = OpenSearch(
- hosts=[{"host": host, "port": port}],
- http_auth=(username, password),
- **os_kwargs,
+ self.client = _create_opensearch_client(
+ self.host,
+ port,
+ username,
+ password,
+ cast("dict[str, Any]", os_kwargs),
)
+ self.delete_local_copy = kwargs.get(
+ "delete_local_copy", conf.getboolean("logging",
"delete_local_logs")
+ )
+ self.log_id_template = log_id_template
self.formatter: logging.Formatter
- self.handler: logging.FileHandler | logging.StreamHandler
+ self.handler: logging.FileHandler | logging.StreamHandler | None = None
self._doc_type_map: dict[Any, Any] = {}
self._doc_type: list[Any] = []
+ self.io = OpensearchRemoteLogIO(
+ host=self.host,
+ port=port,
+ username=username,
+ password=password,
+ write_to_opensearch=self.write_to_opensearch,
+ target_index=self.target_index,
+ write_stdout=self.write_stdout,
+ offset_field=self.offset_field,
+ host_field=self.host_field,
+ base_log_folder=base_log_folder,
+ delete_local_copy=self.delete_local_copy,
+ json_format=self.json_format,
+ log_id_template=self.log_id_template,
+ )
+ if AIRFLOW_V_3_0_PLUS:
+ if AIRFLOW_V_3_2_PLUS:
+ from airflow.logging_config import _ActiveLoggingConfig,
get_remote_task_log
+
+ if get_remote_task_log() is None:
+ _ActiveLoggingConfig.set(self.io, None)
+ elif alc.REMOTE_TASK_LOG is None: # type: ignore[attr-defined]
+ alc.REMOTE_TASK_LOG = self.io # type: ignore[attr-defined]
def set_context(self, ti: TaskInstance, *, identifier: str | None = None)
-> None:
"""
@@ -342,6 +458,7 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
def _render_log_id(self, ti: TaskInstance | TaskInstanceKey, try_number:
int) -> str:
from airflow.models.taskinstance import TaskInstanceKey
+ log_id_template = self.log_id_template
with create_session() as session:
if isinstance(ti, TaskInstanceKey):
ti = _ensure_ti(ti, session)
@@ -393,9 +510,9 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
offset = metadata["offset"]
log_id = self._render_log_id(ti, try_number)
- response = self._os_read(log_id, offset, ti)
+ response = self.io._os_read(log_id, offset, ti)
if response is not None and response.hits:
- logs_by_host = self._group_logs_by_host(response)
+ logs_by_host = self.io._group_logs_by_host(response)
next_offset = attrgetter(self.offset_field)(response[-1])
else:
logs_by_host = None
@@ -410,7 +527,7 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
# have the log uploaded but will not be stored in elasticsearch.
metadata["end_of_log"] = False
if logs_by_host:
- if any(x[-1].message == self.end_of_log_mark for x in
logs_by_host.values()):
+ if any(self._get_log_message(x[-1]) == self.end_of_log_mark for x
in logs_by_host.values()):
metadata["end_of_log"] = True
cur_ts = pendulum.now()
@@ -446,10 +563,6 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
# If we hit the end of the log, remove the actual end_of_log message
# to prevent it from showing in the UI.
- def concat_logs(hits: list[Hit]):
- log_range = (len(hits) - 1) if hits[-1].message ==
self.end_of_log_mark else len(hits)
- return "\n".join(self._format_msg(hits[i]) for i in
range(log_range))
-
if logs_by_host:
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.log.file_task_handler import
StructuredLogMessage
@@ -469,9 +582,10 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
for hit in hits
]
else:
- message = [(host, concat_logs(hits)) for host, hits in
logs_by_host.items()] # type: ignore[misc]
+ message = [(host, self.concat_logs(hits)) for host, hits in
logs_by_host.items()] # type: ignore[misc]
else:
message = []
+ metadata["end_of_log"] = True
return message, metadata
def _os_read(self, log_id: str, offset: int | str, ti: TaskInstance) ->
OpensearchResponse | None:
@@ -576,7 +690,7 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
dt = hit.get("_type")
if "_nested" in hit:
- doc_class = self._resolve_nested(hit, parent_class)
+ doc_class = _resolve_nested(hit, parent_class)
elif dt in self._doc_type_map:
doc_class = self._doc_type_map[dt]
@@ -594,32 +708,6 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
callback: type[Hit] | Callable[..., Any] = getattr(doc_class,
"from_es", doc_class)
return callback(hit)
- def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) ->
type[Hit]:
- """
- Resolve nested hits from Elasticsearch by iteratively navigating the
`_nested` field.
-
- The result is used to fetch the appropriate document class to handle
the hit.
-
- This method can be used with nested Elasticsearch fields which are
structured
- as dictionaries with "field" and "_nested" keys.
- """
- doc_class = Hit
-
- nested_path: list[str] = []
- nesting = hit["_nested"]
- while nesting and "field" in nesting:
- nested_path.append(nesting["field"])
- nesting = nesting.get("_nested")
- nested_path_str = ".".join(nested_path)
-
- if hasattr(parent_class, "_index"):
- nested_field = parent_class._index.resolve_field(nested_path_str)
-
- if nested_field is not None:
- return nested_field._doc_class
-
- return doc_class
-
def _group_logs_by_host(self, response: OpensearchResponse) -> dict[str,
list[Hit]]:
grouped_logs = defaultdict(list)
for hit in response:
@@ -638,7 +726,18 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
)
# Just a safe-guard to preserve backwards-compatibility
- return hit.message
+ return self._get_log_message(hit)
+
+ def _get_log_message(self, hit: Hit) -> str:
+ if hasattr(hit, "event"):
+ return hit.event
+ if hasattr(hit, "message"):
+ return hit.message
+ return ""
+
+ def concat_logs(self, hits: list[Hit]) -> str:
+ log_range = (len(hits) - 1) if self._get_log_message(hits[-1]) ==
self.end_of_log_mark else len(hits)
+ return "\n".join(self._format_msg(hits[i]) for i in range(log_range))
@property
def supports_external_link(self) -> bool:
@@ -664,18 +763,198 @@ class OpensearchTaskHandler(FileTaskHandler,
ExternalLoggingMixin, LoggingMixin)
@staticmethod
def format_url(host: str) -> str:
+ return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin): # noqa: D101
+ json_format: bool = False
+ write_stdout: bool = False
+ write_to_opensearch: bool = False
+ delete_local_copy: bool = False
+ host: str = "localhost"
+ port: int | None = 9200
+ username: str = ""
+ password: str = ""
+ host_field: str = "host"
+ target_index: str = "airflow-logs"
+ offset_field: str = "offset"
+ base_log_folder: Path = attrs.field(converter=Path)
+ log_id_template: str = (
+ conf.get("opensearch", "log_id_template", fallback="")
+ or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+ )
+
+ processors = ()
+
+ def __attrs_post_init__(self):
+ self.host = _format_url(self.host)
+ self.port = self.port if self.port is not None else
(urlparse(self.host).port or 9200)
+ self.client = _create_opensearch_client(
+ self.host,
+ self.port,
+ self.username,
+ self.password,
+ get_os_kwargs_from_config(),
+ )
+ self.index_patterns_callable = conf.get("opensearch",
"index_patterns_callable", fallback="")
+ self.PAGE = 0
+ self.MAX_LINE_PER_PAGE = 1000
+ self.index_patterns = conf.get("opensearch", "index_patterns",
fallback="_all")
+ self._doc_type_map: dict[Any, Any] = {}
+ self._doc_type: list[Any] = []
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Emit structured task logs to stdout and/or write them directly to
OpenSearch."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+ if not local_loc.is_file():
+ return
+
+ log_id = _render_log_id(self.log_id_template, ti, ti.try_number) #
type: ignore[arg-type]
+ if self.write_stdout or self.write_to_opensearch:
+ log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+
+ if self.write_stdout:
+ for line in log_lines:
+ sys.stdout.write(json.dumps(line) + "\n")
+ sys.stdout.flush()
+
+ if self.write_to_opensearch:
+ success = self._write_to_opensearch(log_lines)
+ if success and self.delete_local_copy:
+ local_loc.unlink(missing_ok=True)
+ base_dir = self.base_log_folder
+ parent = local_loc.parent
+ while parent != base_dir and parent.is_dir():
+ if any(parent.iterdir()):
+ break
+ with contextlib.suppress(OSError):
+ parent.rmdir()
+ parent = parent.parent
+
+ def _parse_raw_log(self, log: str, log_id: str) -> list[dict[str, Any]]:
+ parsed_logs = []
+ offset = 1
+ for line in log.split("\n"):
+ if not line.strip():
+ continue
+ try:
+ log_dict = json.loads(line)
+ except json.JSONDecodeError:
+ self.log.warning("Skipping non-JSON log line: %r", line)
+ log_dict = {"event": line}
+ log_dict.update({"log_id": log_id, self.offset_field: offset})
+ offset += 1
+ parsed_logs.append(log_dict)
+ return parsed_logs
+
+ def _write_to_opensearch(self, log_lines: list[dict[str, Any]]) -> bool:
"""
- Format the given host string to ensure it starts with 'http' and check
if it represents a valid URL.
+ Write logs to OpenSearch; return `True` on success and `False` on
failure.
- :params host: The host string to format and check.
+ :param log_lines: The parsed log lines to write to OpenSearch.
"""
- parsed_url = urlparse(host)
+ bulk_actions = [{"_index": self.target_index, "_source": log} for log
in log_lines]
+ try:
+ _ = helpers.bulk(self.client, bulk_actions)
+ return True
+ except helpers.BulkIndexError as bie:
+ self.log.exception("Bulk upload failed for %d log(s)",
len(bie.errors))
+ for error in bie.errors:
+ self.log.exception(error)
+ return False
+ except Exception as e:
+ self.log.exception("Unable to insert logs into OpenSearch. Reason:
%s", str(e))
+ return False
+
+ def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo,
LogMessages]:
+ log_id = _render_log_id(self.log_id_template, ti, ti.try_number) #
type: ignore[arg-type]
+ self.log.info("Reading log %s from Opensearch", log_id)
+ response = self._os_read(log_id, 0, ti)
+ if response is not None and response.hits:
+ logs_by_host = self._group_logs_by_host(response)
+ else:
+ logs_by_host = None
- if parsed_url.scheme not in ("http", "https"):
- host = "http://" + host
- parsed_url = urlparse(host)
+ if logs_by_host is None:
+ missing_log_message = (
+ f"*** Log {log_id} not found in Opensearch. "
+ "If your task started recently, please wait a moment and
reload this page. "
+ "Otherwise, the logs for this task instance may have been
removed."
+ )
+ return [], [missing_log_message]
- if not parsed_url.netloc:
- raise ValueError(f"'{host}' is not a valid URL.")
+ header = list(logs_by_host.keys())
+ message = []
+ for hits in logs_by_host.values():
+ for hit in hits:
+ message.append(json.dumps(_build_log_fields(hit.to_dict())))
+ return header, message
- return host
+ def _os_read(self, log_id: str, offset: int | str, ti: RuntimeTI) ->
OpensearchResponse | None:
+ """Return the logs matching ``log_id`` in OpenSearch."""
+ query: dict[Any, Any] = {
+ "query": {
+ "bool": {
+ "filter": [{"range": {self.offset_field: {"gt":
int(offset)}}}],
+ "must": [{"match_phrase": {"log_id": log_id}}],
+ }
+ }
+ }
+ index_patterns = self._get_index_patterns(ti)
+ try:
+ max_log_line = self.client.count(index=index_patterns,
body=query)["count"]
+ except NotFoundError as e:
+ self.log.exception("The target index pattern %s does not exist",
index_patterns)
+ raise e
+
+ if max_log_line != 0:
+ try:
+ res = self.client.search(
+ index=index_patterns,
+ body=query,
+ sort=[self.offset_field],
+ size=self.MAX_LINE_PER_PAGE,
+ from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+ )
+ return OpensearchResponse(self, res)
+ except Exception as err:
+ self.log.exception("Could not read log with log_id: %s.
Exception: %s", log_id, err)
+
+ return None
+
+ def _get_index_patterns(self, ti: RuntimeTI | None) -> str:
+ if self.index_patterns_callable:
+ self.log.debug("Using index_patterns_callable: %s",
self.index_patterns_callable)
+ index_pattern_callable_obj =
import_string(self.index_patterns_callable)
+ return index_pattern_callable_obj(ti)
+ self.log.debug("Using index_patterns: %s", self.index_patterns)
+ return self.index_patterns
+
+ def _group_logs_by_host(self, response: OpensearchResponse) -> dict[str,
list[Hit]]:
+ grouped_logs = defaultdict(list)
+ for hit in response:
+ key = getattr_nested(hit, self.host_field, None) or self.host
+ grouped_logs[key].append(hit)
+ return grouped_logs
+
+ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:
+ doc_class = Hit
+ dt = hit.get("_type")
+
+ if "_nested" in hit:
+ doc_class = _resolve_nested(hit, parent_class)
+ elif dt in self._doc_type_map:
+ doc_class = self._doc_type_map[dt]
+ else:
+ for doc_type in self._doc_type:
+ if hasattr(doc_type, "_matches") and doc_type._matches(hit):
+ doc_class = doc_type
+ break
+
+ for t in hit.get("inner_hits", ()):
+ hit["inner_hits"][t] = OpensearchResponse(self,
hit["inner_hits"][t], doc_class=doc_class)
+
+ callback: type[Hit] | Callable[..., Any] = getattr(doc_class,
"from_es", doc_class)
+ return callback(hit)
diff --git
a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
index 613a946fc90..e840569a4d8 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
@@ -34,8 +34,10 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
+AIRFLOW_V_3_2_PLUS: bool = get_base_airflow_version_tuple() >= (3, 2, 0)
__all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
+ "AIRFLOW_V_3_2_PLUS",
]
diff --git a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
b/providers/opensearch/tests/integration/__init__.py
similarity index 52%
copy from providers/opensearch/src/airflow/providers/opensearch/__init__.py
copy to providers/opensearch/tests/integration/__init__.py
index 275df0392b4..5966d6b1d52 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
+++ b/providers/opensearch/tests/integration/__init__.py
@@ -14,26 +14,4 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.5"
-
-if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
- "2.11.0"
-):
- raise RuntimeError(
- f"The package `apache-airflow-providers-opensearch:{__version__}`
needs Apache Airflow 2.11.0+"
- )
+__path__ = __import__("pkgutil").extend_path(__path__, __name__)
diff --git a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
b/providers/opensearch/tests/integration/opensearch/__init__.py
similarity index 52%
copy from providers/opensearch/src/airflow/providers/opensearch/__init__.py
copy to providers/opensearch/tests/integration/opensearch/__init__.py
index 275df0392b4..13a83393a91 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
+++ b/providers/opensearch/tests/integration/opensearch/__init__.py
@@ -14,26 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.5"
-
-if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
- "2.11.0"
-):
- raise RuntimeError(
- f"The package `apache-airflow-providers-opensearch:{__version__}`
needs Apache Airflow 2.11.0+"
- )
diff --git a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
b/providers/opensearch/tests/integration/opensearch/log/__init__.py
similarity index 52%
copy from providers/opensearch/src/airflow/providers/opensearch/__init__.py
copy to providers/opensearch/tests/integration/opensearch/log/__init__.py
index 275df0392b4..13a83393a91 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/__init__.py
+++ b/providers/opensearch/tests/integration/opensearch/log/__init__.py
@@ -14,26 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE
-# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES.
-#
-# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE
-# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the
`dev/breeze/src/airflow_breeze/templates` DIRECTORY
-#
-from __future__ import annotations
-
-import packaging.version
-
-from airflow import __version__ as airflow_version
-
-__all__ = ["__version__"]
-
-__version__ = "1.8.5"
-
-if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
- "2.11.0"
-):
- raise RuntimeError(
- f"The package `apache-airflow-providers-opensearch:{__version__}`
needs Apache Airflow 2.11.0+"
- )
diff --git
a/providers/opensearch/tests/integration/opensearch/log/test_os_remote_log_io.py
b/providers/opensearch/tests/integration/opensearch/log/test_os_remote_log_io.py
new file mode 100644
index 00000000000..9aef4777618
--- /dev/null
+++
b/providers/opensearch/tests/integration/opensearch/log/test_os_remote_log_io.py
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import dataclasses
+import json
+import uuid
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.opensearch.log.os_task_handler import
OpensearchRemoteLogIO, _render_log_id
+
+opensearchpy = pytest.importorskip("opensearchpy")
+
+# The OpenSearch service hostname as defined in
scripts/ci/docker-compose/integration-opensearch.yml
+OS_HOST = "http://opensearch"
+
+
[email protected]
+class _MockTI:
+ """Minimal TaskInstance-like object satisfying the RuntimeTI protocol for
log ID rendering."""
+
+ dag_id: str = "integration_test_dag"
+ task_id: str = "integration_test_task"
+ run_id: str = "integration_test_run"
+ try_number: int = 1
+ map_index: int = -1
+
+
[email protected]("opensearch")
+class TestOpensearchRemoteLogIOIntegration:
+ """
+ Integration tests for OpensearchRemoteLogIO using the breeze opensearch
service.
+
+ These tests require the opensearch integration to be running:
+ breeze testing providers-integration-tests --integration opensearch
+ """
+
+ @pytest.fixture(autouse=True)
+ def setup(self, tmp_path):
+ self.target_index = f"airflow-logs-{uuid.uuid4().hex}"
+ self.opensearch_io = OpensearchRemoteLogIO(
+ write_to_opensearch=True,
+ write_stdout=False,
+ delete_local_copy=False,
+ host=OS_HOST,
+ port=9200,
+ username="",
+ password="",
+ base_log_folder=tmp_path,
+ target_index=self.target_index,
+ )
+ self.opensearch_io.index_patterns = self.target_index
+ self.opensearch_io.client = opensearchpy.OpenSearch(
+ hosts=[{"host": "opensearch", "port": 9200, "scheme": "http"}]
+ )
+
+ @pytest.fixture
+ def ti(self):
+ return _MockTI()
+
+ @pytest.fixture
+ def tmp_json_log_file(self, tmp_path):
+ log_file = tmp_path / "1.log"
+ sample_logs = [
+ {"message": "start"},
+ {"message": "processing"},
+ {"message": "end"},
+ ]
+ log_file.write_text("\n".join(json.dumps(log) for log in sample_logs)
+ "\n")
+ return log_file
+
+ @patch(
+ "airflow.providers.opensearch.log.os_task_handler.TASK_LOG_FIELDS",
+ ["message"],
+ )
+ def test_upload_and_read(self, tmp_json_log_file, ti):
+ self.opensearch_io.upload(tmp_json_log_file, ti)
+ self.opensearch_io.client.indices.refresh(index=self.target_index)
+
+ log_source_info, log_messages = self.opensearch_io.read("", ti)
+
+ assert log_source_info[0] == OS_HOST
+ assert len(log_messages) == 3
+
+ expected_messages = ["start", "processing", "end"]
+ for expected, log_message in zip(expected_messages, log_messages):
+ log_entry = json.loads(log_message)
+ assert "event" in log_entry
+ assert log_entry["event"] == expected
+
+ def test_read_missing_log(self, ti):
+ self.opensearch_io.client.indices.create(index=self.target_index)
+
+ log_source_info, log_messages = self.opensearch_io.read("", ti)
+
+ assert log_source_info == []
+ assert len(log_messages) == 1
+ assert "not found in Opensearch" in log_messages[0]
+
+ def test_read_error_detail_integration(self, ti):
+ error_detail = [
+ {
+ "is_cause": False,
+ "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}],
+ "exc_type": "RuntimeError",
+ "exc_value": "Woopsie. Something went wrong.",
+ }
+ ]
+ body = {
+ "event": "Task failed with exception",
+ "log_id": _render_log_id(self.opensearch_io.log_id_template, ti,
ti.try_number),
+ "offset": 1,
+ "error_detail": error_detail,
+ }
+ self.opensearch_io.client.index(index=self.target_index, body=body)
+ self.opensearch_io.client.indices.refresh(index=self.target_index)
+
+ log_source_info, log_messages = self.opensearch_io.read("", ti)
+
+ assert log_source_info[0] == OS_HOST
+ assert len(log_messages) == 1
+ log_entry = json.loads(log_messages[0])
+ assert "error_detail" in log_entry
+ assert log_entry["error_detail"] == error_detail
diff --git a/providers/opensearch/tests/unit/opensearch/conftest.py
b/providers/opensearch/tests/unit/opensearch/conftest.py
index d58a4703e76..97256d4ac5d 100644
--- a/providers/opensearch/tests/unit/opensearch/conftest.py
+++ b/providers/opensearch/tests/unit/opensearch/conftest.py
@@ -49,55 +49,6 @@ class MockSearch(OpenSearchHook):
return doc_id
-class MockClient:
- def count(self, index: Any = None, body: Any = None):
- return {"count": 1, "_shards": {"total": 1, "successful": 1,
"skipped": 0, "failed": 0}}
-
- def search(self, index=None, body=None, sort=None, size=None, from_=None):
- return self.sample_log_response()
-
- def sample_log_response(self):
- return {
- "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total":
7},
- "hits": {
- "hits": [
- {
- "_id": "jdeZT4kBjAZqZnexVUxk",
- "_source": {
- "dag_id": "example_bash_operator",
- "execution_date": "2023_07_09T07_47_32_000000",
- "levelname": "INFO",
- "message": "Some Message 1",
- "event": "Some Message 1",
- "task_id": "run_after_loop",
- "try_number": "1",
- "offset": 0,
- },
- "_type": "_doc",
- },
- {
- "_id": "qteZT4kBjAZqZnexVUxl",
- "_source": {
- "dag_id": "example_bash_operator",
- "execution_date": "2023_07_09T07_47_32_000000",
- "levelname": "INFO",
- "message": "Another Some Message 2",
- "event": "Another Some Message 2",
- "task_id": "run_after_loop",
- "try_number": "1",
- "offset": 1,
- },
- "_type": "_doc",
- },
- ],
- "max_score": 2.482621,
- "total": {"relation": "eq", "value": 36},
- },
- "timed_out": False,
- "took": 7,
- }
-
-
@pytest.fixture
def mock_hook(monkeypatch):
monkeypatch.setattr(OpenSearchHook, "search", MockSearch.search)
diff --git
a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
index 15aba25ae8b..e66925c7e10 100644
--- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
+++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
@@ -17,13 +17,12 @@
# under the License.
from __future__ import annotations
+import dataclasses
import json
import logging
-import os
import re
-import shutil
from io import StringIO
-from unittest import mock
+from pathlib import Path
from unittest.mock import Mock, patch
import pendulum
@@ -31,9 +30,14 @@ import pytest
from opensearchpy.exceptions import NotFoundError
from airflow.providers.common.compat.sdk import conf
+from airflow.providers.opensearch.log.os_json_formatter import
OpensearchJSONFormatter
from airflow.providers.opensearch.log.os_response import OpensearchResponse
from airflow.providers.opensearch.log.os_task_handler import (
+ OpensearchRemoteLogIO,
OpensearchTaskHandler,
+ _build_log_fields,
+ _format_error_detail,
+ _render_log_id,
get_os_kwargs_from_config,
getattr_nested,
)
@@ -44,11 +48,19 @@ from airflow.utils.timezone import datetime
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_dags, clear_db_runs
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-from unit.opensearch.conftest import MockClient
opensearchpy = pytest.importorskip("opensearchpy")
[email protected]
+class _MockTI:
+ dag_id: str = "dag_for_testing_os_log_handler"
+ task_id: str = "task_for_testing_os_log_handler"
+ run_id: str = "run_for_testing_os_log_handler"
+ try_number: int = 1
+ map_index: int = -1
+
+
def get_ti(dag_id, task_id, logical_date, create_task_instance):
ti = create_task_instance(
dag_id=dag_id,
@@ -62,6 +74,75 @@ def get_ti(dag_id, task_id, logical_date,
create_task_instance):
return ti
+def _build_os_search_response(*sources: dict, index: str = "test_index",
doc_type: str = "_doc") -> dict:
+ hits = [
+ {
+ "_id": str(i),
+ "_index": index,
+ "_score": 1.0,
+ "_source": source,
+ "_type": doc_type,
+ }
+ for i, source in enumerate(sources, start=1)
+ ]
+ return {
+ "_shards": {"failed": 0, "skipped": 0, "successful": 1, "total": 1},
+ "hits": {
+ "hits": hits,
+ "max_score": 1.0,
+ "total": {"relation": "eq", "value": len(hits)},
+ },
+ "timed_out": False,
+ "took": 1,
+ }
+
+
+def _make_os_response(search, *sources: dict) -> OpensearchResponse:
+ return OpensearchResponse(search, _build_os_search_response(*sources))
+
+
+def _metadata_from_result(metadatas):
+ return metadatas if AIRFLOW_V_3_0_PLUS else metadatas[0]
+
+
+def _assert_log_events(logs, metadatas, *, expected_events: list[str],
expected_sources: list[str]):
+ metadata = _metadata_from_result(metadatas)
+ if AIRFLOW_V_3_0_PLUS:
+ logs = list(logs)
+ assert logs[0].event == "::group::Log message source details"
+ assert logs[0].sources == expected_sources
+ assert logs[1].event == "::endgroup::"
+ assert [log.event for log in logs[2:]] == expected_events
+ else:
+ assert len(logs) == 1
+ assert len(logs[0]) == 1
+ assert logs[0][0][0] == expected_sources[0]
+ assert logs[0][0][1] == "\n".join(expected_events)
+ return metadata
+
+
+def _assert_no_logs(logs, metadatas):
+ metadata = _metadata_from_result(metadatas)
+ if AIRFLOW_V_3_0_PLUS:
+ assert logs == []
+ else:
+ assert logs == [[]]
+ return metadata
+
+
+def _assert_missing_log_message(logs):
+ expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*"
+ if AIRFLOW_V_3_0_PLUS:
+ logs = list(logs)
+ assert len(logs) == 1
+ assert logs[0].event is not None
+ assert re.match(expected_pattern, logs[0].event) is not None
+ else:
+ assert len(logs) == 1
+ assert len(logs[0]) == 1
+ assert re.match(expected_pattern, logs[0][0][1]) is not None
+
+
class TestOpensearchTaskHandler:
DAG_ID = "dag_for_testing_os_task_handler"
TASK_ID = "task_for_testing_os_log_handler"
@@ -70,80 +151,83 @@ class TestOpensearchTaskHandler:
JSON_LOG_ID =
f"{DAG_ID}-{TASK_ID}-{OpensearchTaskHandler._clean_date(LOGICAL_DATE)}-1"
FILENAME_TEMPLATE = "{try_number}.log"
- # TODO: Remove when we stop testing for 2.11 compatibility
@pytest.fixture(autouse=True)
def _use_historical_filename_templates(self):
with conf_vars({("core", "use_historical_filename_templates"):
"True"}):
yield
- @pytest.fixture
- def ti(self, create_task_instance, create_log_template):
- if AIRFLOW_V_3_0_PLUS:
- create_log_template(self.FILENAME_TEMPLATE,
"{dag_id}-{task_id}-{logical_date}-{try_number}")
- else:
- create_log_template(
- self.FILENAME_TEMPLATE,
- "{dag_id}-{task_id}-{execution_date}-{try_number}",
- )
- yield get_ti(
- dag_id=self.DAG_ID,
- task_id=self.TASK_ID,
- logical_date=self.LOGICAL_DATE,
- create_task_instance=create_task_instance,
- )
- clear_db_runs()
- clear_db_dags()
-
- def setup_method(self):
- self.local_log_location = "local/log/location"
+ @pytest.fixture(autouse=True)
+ def _setup_handler(self, tmp_path):
+ self.local_log_location = str(tmp_path / "logs")
self.end_of_log_mark = "end_of_log\n"
self.write_stdout = False
self.json_format = False
self.json_fields = "asctime,filename,lineno,levelname,message,exc_text"
self.host_field = "host"
self.offset_field = "offset"
- self.username = "admin"
- self.password = "admin"
- self.host = "localhost"
- self.port = 9200
+ self.test_message = "Some Message 1"
+ self.base_log_source = {
+ "message": self.test_message,
+ "event": self.test_message,
+ "log_id": self.LOG_ID,
+ "offset": 1,
+ }
self.os_task_handler = OpensearchTaskHandler(
base_log_folder=self.local_log_location,
end_of_log_mark=self.end_of_log_mark,
write_stdout=self.write_stdout,
- host=self.host,
- port=self.port,
- username=self.username,
- password=self.password,
+ host="localhost",
+ port=9200,
+ username="admin",
+ password="admin",
json_format=self.json_format,
json_fields=self.json_fields,
host_field=self.host_field,
offset_field=self.offset_field,
)
- self.os_task_handler.client = MockClient()
-
- def teardown_method(self):
- shutil.rmtree(self.local_log_location.split(os.path.sep)[0],
ignore_errors=True)
-
- def test_os_response(self):
- sample_response = self.os_task_handler.client.sample_log_response()
- response = OpensearchResponse(self.os_task_handler, sample_response)
- logs_by_host = self.os_task_handler._group_logs_by_host(response)
-
- def concat_logs(lines):
- log_range = -1 if lines[-1].message ==
self.os_task_handler.end_of_log_mark else None
- return "\n".join(self.os_task_handler._format_msg(line) for line
in lines[:log_range])
-
- for hosted_log in logs_by_host.values():
- message = concat_logs(hosted_log)
+ @pytest.fixture
+ def ti(self, create_task_instance, create_log_template):
+ create_log_template(
+ self.FILENAME_TEMPLATE,
+ (
+ "{dag_id}-{task_id}-{logical_date}-{try_number}"
+ if AIRFLOW_V_3_0_PLUS
+ else "{dag_id}-{task_id}-{execution_date}-{try_number}"
+ ),
+ )
+ yield get_ti(
+ dag_id=self.DAG_ID,
+ task_id=self.TASK_ID,
+ logical_date=self.LOGICAL_DATE,
+ create_task_instance=create_task_instance,
+ )
+ clear_db_runs()
+ clear_db_dags()
- assert message == "Some Message 1\nAnother Some Message 2"
+ @pytest.mark.parametrize(
+ ("host", "expected"),
+ [
+ ("http://localhost", "http://localhost"),
+ ("https://localhost", "https://localhost"),
+ ("localhost", "http://localhost"),
+ ("someurl", "http://someurl"),
+ ("https://", "ValueError"),
+ ],
+ )
+ def test_format_url(self, host, expected):
+ if expected == "ValueError":
+ with pytest.raises(ValueError, match="'https://' is not a valid
URL."):
+ OpensearchTaskHandler.format_url(host)
+ else:
+ assert OpensearchTaskHandler.format_url(host) == expected
def test_client(self):
+ assert isinstance(self.os_task_handler.client, opensearchpy.OpenSearch)
assert self.os_task_handler.index_patterns == "_all"
def test_client_with_config(self):
- config = dict(conf.getsection("opensearch_configs"))
+ os_conf = dict(conf.getsection("opensearch_configs"))
expected_dict = {
"http_compress": False,
"use_ssl": False,
@@ -152,34 +236,32 @@ class TestOpensearchTaskHandler:
"ssl_show_warn": False,
"ca_certs": "",
}
- assert config == expected_dict
- # ensure creating with configs does not fail
+ assert os_conf == expected_dict
OpensearchTaskHandler(
base_log_folder=self.local_log_location,
end_of_log_mark=self.end_of_log_mark,
write_stdout=self.write_stdout,
- host=self.host,
- port=self.port,
- username=self.username,
- password=self.password,
+ host="localhost",
+ port=9200,
+ username="admin",
+ password="admin",
json_format=self.json_format,
json_fields=self.json_fields,
host_field=self.host_field,
offset_field=self.offset_field,
- os_kwargs=config,
+ os_kwargs=os_conf,
)
def test_client_with_patterns(self):
- # ensure creating with index patterns does not fail
patterns = "test_*,other_*"
handler = OpensearchTaskHandler(
base_log_folder=self.local_log_location,
end_of_log_mark=self.end_of_log_mark,
write_stdout=self.write_stdout,
- host=self.host,
- port=self.port,
- username=self.username,
- password=self.password,
+ host="localhost",
+ port=9200,
+ username="admin",
+ password="admin",
json_format=self.json_format,
json_fields=self.json_fields,
host_field=self.host_field,
@@ -189,197 +271,113 @@ class TestOpensearchTaskHandler:
assert handler.index_patterns == patterns
@pytest.mark.db_test
- def test_read(self, ti):
- ts = pendulum.now()
- logs, metadatas = self.os_task_handler.read(
- ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log":
False}
- )
+ @pytest.mark.parametrize("metadata_mode", ["provided", "none", "empty"])
+ def test_read(self, ti, metadata_mode):
+ start_time = pendulum.now()
+ response = _make_os_response(self.os_task_handler.io,
self.base_log_source)
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- expected_msg = "Some Message 1"
- assert logs[0].event == "::group::Log message source details"
- assert logs[0].sources == ["http://localhost"]
- assert logs[1].event == "::endgroup::"
- assert logs[2].event == expected_msg
- metadata = metadatas
- else:
- expected_msg = "Some Message 1\nAnother Some Message 2"
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert len(logs[0]) == 1
- assert logs[0][0][-1] == expected_msg
+ with patch.object(self.os_task_handler.io, "_os_read",
return_value=response):
+ if metadata_mode == "provided":
+ logs, metadatas = self.os_task_handler.read(
+ ti,
+ 1,
+ {"offset": 0, "last_log_timestamp": str(start_time),
"end_of_log": False},
+ )
+ elif metadata_mode == "empty":
+ logs, metadatas = self.os_task_handler.read(ti, 1, {})
+ else:
+ logs, metadatas = self.os_task_handler.read(ti, 1)
- metadata = metadatas[0]
+ metadata = _assert_log_events(
+ logs,
+ metadatas,
+ expected_events=[self.test_message],
+ expected_sources=["http://localhost"],
+ )
assert not metadata["end_of_log"]
- assert timezone.parse(metadata["last_log_timestamp"]) > ts
+ assert metadata["offset"] == "1"
+ assert timezone.parse(metadata["last_log_timestamp"]) >= start_time
@pytest.mark.db_test
- def test_read_with_patterns(self, ti):
- ts = pendulum.now()
- with mock.patch.object(self.os_task_handler, "index_patterns",
new="test_*,other_*"):
- logs, metadatas = self.os_task_handler.read(
- ti, 1, {"offset": 0, "last_log_timestamp": str(ts),
"end_of_log": False}
- )
-
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- expected_msg = "Some Message 1"
- assert logs[0].event == "::group::Log message source details"
- assert logs[0].sources == ["http://localhost"]
- assert logs[1].event == "::endgroup::"
- assert logs[2].event == expected_msg
- metadata = metadatas
- else:
- expected_msg = "Some Message 1\nAnother Some Message 2"
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert len(logs[0]) == 1
- assert logs[0][0][-1] == expected_msg
-
- metadata = metadatas[0]
+ def test_read_defaults_offset_when_missing_from_metadata(self, ti):
+ start_time = pendulum.now()
+ with patch.object(self.os_task_handler.io, "_os_read",
return_value=None):
+ logs, metadatas = self.os_task_handler.read(ti, 1, {"end_of_log":
False})
- assert not metadata["end_of_log"]
- assert timezone.parse(metadata["last_log_timestamp"]) > ts
+ metadata = _assert_no_logs(logs, metadatas)
+ assert metadata["end_of_log"]
+ assert metadata["offset"] == "0"
+ assert timezone.parse(metadata["last_log_timestamp"]) >= start_time
@pytest.mark.db_test
- def test_read_with_patterns_no_match(self, ti):
- ts = pendulum.now()
- with mock.patch.object(self.os_task_handler, "index_patterns",
new="test_other_*,test_another_*"):
- with mock.patch.object(
- self.os_task_handler.client,
- "search",
- return_value={
- "_shards": {"failed": 0, "skipped": 0, "successful": 7,
"total": 7},
- "hits": {"hits": []},
- "timed_out": False,
- "took": 7,
- },
- ):
- logs, metadatas = self.os_task_handler.read(
- ti,
- 1,
- {"offset": 0, "last_log_timestamp": str(ts), "end_of_log":
False},
- )
- if AIRFLOW_V_3_0_PLUS:
- assert logs == []
+ @pytest.mark.parametrize("seconds", [3, 6])
+ def test_read_missing_logs(self, ti, seconds):
+ start_time = pendulum.now().add(seconds=-seconds)
+ with patch.object(self.os_task_handler.io, "_os_read",
return_value=None):
+ logs, metadatas = self.os_task_handler.read(
+ ti,
+ 1,
+ {"offset": 0, "last_log_timestamp": str(start_time),
"end_of_log": False},
+ )
- metadata = metadatas
+ metadata = _metadata_from_result(metadatas)
+ if seconds > 5:
+ _assert_missing_log_message(logs)
else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert logs == [[]]
-
- metadata = metadatas[0]
+ _assert_no_logs(logs, metadatas)
- assert not metadata["end_of_log"]
+ assert metadata["end_of_log"]
assert metadata["offset"] == "0"
- # last_log_timestamp won't change if no log lines read.
- assert timezone.parse(metadata["last_log_timestamp"]) == ts
+ assert timezone.parse(metadata["last_log_timestamp"]) == start_time
@pytest.mark.db_test
- def test_read_with_missing_index(self, ti):
- ts = pendulum.now()
- with mock.patch.object(self.os_task_handler, "index_patterns",
new="nonexistent,test_*"):
- with mock.patch.object(
- self.os_task_handler.client,
- "count",
- side_effect=NotFoundError(404, "IndexNotFoundError"),
- ):
- with pytest.raises(NotFoundError, match=r"IndexNotFoundError"):
- self.os_task_handler.read(
- ti,
- 1,
- {
- "offset": 0,
- "last_log_timestamp": str(ts),
- "end_of_log": False,
- },
- )
+ def test_read_timeout(self, ti):
+ start_time = pendulum.now().subtract(minutes=5)
+ with patch.object(self.os_task_handler.io, "_os_read",
return_value=None):
+ logs, metadatas = self.os_task_handler.read(
+ ti,
+ 1,
+ {"offset": 1, "last_log_timestamp": str(start_time),
"end_of_log": False},
+ )
- @pytest.mark.parametrize("seconds", [3, 6])
- @pytest.mark.db_test
- def test_read_missing_logs(self, seconds, create_task_instance):
- """
- When the log actually isn't there to be found, we only want to wait
for 5 seconds.
- In this case we expect to receive a message of the form 'Log {log_id}
not found in Opensearch ...'
- """
- ti = get_ti(
- self.DAG_ID,
- self.TASK_ID,
- pendulum.instance(self.LOGICAL_DATE).add(days=1), # so logs are
not found
- create_task_instance=create_task_instance,
- )
- ts = pendulum.now().add(seconds=-seconds)
- with mock.patch.object(
- self.os_task_handler.client,
- "search",
- return_value={
- "_shards": {"failed": 0, "skipped": 0, "successful": 7,
"total": 7},
- "hits": {"hits": []},
- "timed_out": False,
- "took": 7,
- },
- ):
- logs, metadatas = self.os_task_handler.read(ti, 1, {"offset": 0,
"last_log_timestamp": str(ts)})
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- if seconds > 5:
- # we expect a log not found message when checking began more
than 5 seconds ago
- assert len(logs) == 1
- actual_message = logs[0].event
- expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*"
- assert re.match(expected_pattern, actual_message) is not None
- assert metadatas["end_of_log"] is True
- else:
- # we've "waited" less than 5 seconds so it should not be "end
of log" and should be no log message
- assert logs == []
- assert metadatas["end_of_log"] is False
- assert metadatas["offset"] == "0"
- assert timezone.parse(metadatas["last_log_timestamp"]) == ts
- else:
- assert len(logs) == 1
- if seconds > 5:
- # we expect a log not found message when checking began more
than 5 seconds ago
- assert len(logs[0]) == 1
- actual_message = logs[0][0][1]
- expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*"
- assert re.match(expected_pattern, actual_message) is not None
- assert metadatas[0]["end_of_log"] is True
- else:
- # we've "waited" less than 5 seconds so it should not be "end
of log" and should be no log message
- assert len(logs[0]) == 0
- assert logs == [[]]
- assert metadatas[0]["end_of_log"] is False
- assert len(logs) == len(metadatas)
- assert metadatas[0]["offset"] == "0"
- assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts
+ metadata = _assert_no_logs(logs, metadatas)
+ assert metadata["end_of_log"]
+ assert metadata["offset"] == "1"
+ assert timezone.parse(metadata["last_log_timestamp"]) == start_time
@pytest.mark.db_test
- def test_read_with_none_metadata(self, ti):
- logs, metadatas = self.os_task_handler.read(ti, 1)
-
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- expected_message = "Some Message 1"
- assert logs[0].event == "::group::Log message source details"
- assert logs[0].sources == ["http://localhost"]
- assert logs[1].event == "::endgroup::"
- assert logs[2].event == expected_message
-
- metadata = metadatas
- else:
- expected_message = "Some Message 1\nAnother Some Message 2"
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert len(logs[0]) == 1
- assert logs[0][0][-1] == expected_message
+ def test_read_with_custom_offset_and_host_fields(self, ti):
+ self.os_task_handler.host_field = "host.name"
+ self.os_task_handler.offset_field = "log.offset"
+ self.os_task_handler.io.host_field = "host.name"
+ self.os_task_handler.io.offset_field = "log.offset"
+ response = _make_os_response(
+ self.os_task_handler.io,
+ {
+ "message": self.test_message,
+ "event": self.test_message,
+ "log_id": self.LOG_ID,
+ "log": {"offset": 1},
+ "host": {"name": "somehostname"},
+ },
+ )
- metadata = metadatas[0]
+ with patch.object(self.os_task_handler.io, "_os_read",
return_value=response):
+ logs, metadatas = self.os_task_handler.read(
+ ti,
+ 1,
+ {"offset": 0, "last_log_timestamp": str(pendulum.now()),
"end_of_log": False},
+ )
+ metadata = _assert_log_events(
+ logs,
+ metadatas,
+ expected_events=[self.test_message],
+ expected_sources=["somehostname"],
+ )
+ assert metadata["offset"] == "1"
assert not metadata["end_of_log"]
- assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now()
@pytest.mark.db_test
def test_set_context(self, ti):
@@ -388,27 +386,29 @@ class TestOpensearchTaskHandler:
@pytest.mark.db_test
def test_set_context_w_json_format_and_write_stdout(self, ti):
- formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s
- %(message)s")
- self.os_task_handler.formatter = formatter
+ self.os_task_handler.formatter = logging.Formatter(
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+ )
self.os_task_handler.write_stdout = True
self.os_task_handler.json_format = True
+
self.os_task_handler.set_context(ti)
+ assert isinstance(self.os_task_handler.formatter,
OpensearchJSONFormatter)
+ assert isinstance(self.os_task_handler.handler, logging.StreamHandler)
+ assert self.os_task_handler.context_set
+
@pytest.mark.db_test
def test_close(self, ti):
- formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s
- %(message)s")
- self.os_task_handler.formatter = formatter
+ self.os_task_handler.formatter = logging.Formatter(
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+ )
self.os_task_handler.set_context(ti)
self.os_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- # end_of_log_mark may contain characters like '\n' which is needed
to
- # have the log uploaded but will not be stored in elasticsearch.
- # so apply the strip() to log_file.read()
- log_line = log_file.read().strip()
- assert log_line.endswith(self.end_of_log_mark.strip())
+
+ log_file = Path(self.local_log_location) /
self.FILENAME_TEMPLATE.format(try_number=1)
+ assert
log_file.read_text().strip().endswith(self.end_of_log_mark.strip())
assert self.os_task_handler.closed
@pytest.mark.db_test
@@ -416,51 +416,34 @@ class TestOpensearchTaskHandler:
ti.raw = True
self.os_task_handler.set_context(ti)
self.os_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert self.end_of_log_mark not in log_file.read()
- assert self.os_task_handler.closed
- @pytest.mark.db_test
- def test_close_closed(self, ti):
- self.os_task_handler.closed = True
- self.os_task_handler.set_context(ti)
- self.os_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert len(log_file.read()) == 0
+ log_file = Path(self.local_log_location) /
self.FILENAME_TEMPLATE.format(try_number=1)
+ assert self.end_of_log_mark not in log_file.read_text()
+ assert self.os_task_handler.closed
@pytest.mark.db_test
def test_close_with_no_handler(self, ti):
self.os_task_handler.set_context(ti)
self.os_task_handler.handler = None
self.os_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert len(log_file.read()) == 0
+
+ log_file = Path(self.local_log_location) /
self.FILENAME_TEMPLATE.format(try_number=1)
+ assert log_file.read_text() == ""
assert self.os_task_handler.closed
@pytest.mark.db_test
- def test_close_with_no_stream(self, ti):
+ @pytest.mark.parametrize("stream_state", ["none", "closed"])
+ def test_close_reopens_stream(self, ti, stream_state):
self.os_task_handler.set_context(ti)
- self.os_task_handler.handler.stream = None
- self.os_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert self.end_of_log_mark in log_file.read()
- assert self.os_task_handler.closed
+ if stream_state == "none":
+ self.os_task_handler.handler.stream = None
+ else:
+ self.os_task_handler.handler.stream.close()
- self.os_task_handler.set_context(ti)
- self.os_task_handler.handler.stream.close()
self.os_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert self.end_of_log_mark in log_file.read()
+
+ log_file = Path(self.local_log_location) /
self.FILENAME_TEMPLATE.format(try_number=1)
+ assert self.end_of_log_mark in log_file.read_text()
assert self.os_task_handler.closed
@pytest.mark.db_test
@@ -471,21 +454,20 @@ class TestOpensearchTaskHandler:
assert self.os_task_handler._render_log_id(ti, 1) == self.JSON_LOG_ID
def test_clean_date(self):
- clean_execution_date = self.os_task_handler._clean_date(datetime(2016,
7, 8, 9, 10, 11, 12))
- assert clean_execution_date == "2016_07_08T09_10_11_000012"
+ clean_logical_date = OpensearchTaskHandler._clean_date(datetime(2016,
7, 8, 9, 10, 11, 12))
+ assert clean_logical_date == "2016_07_08T09_10_11_000012"
- @mock.patch("sys.__stdout__", new_callable=StringIO)
@pytest.mark.db_test
+ @patch("sys.__stdout__", new_callable=StringIO)
def test_dynamic_offset(self, stdout_mock, ti, time_machine):
- # arrange
handler = OpensearchTaskHandler(
base_log_folder=self.local_log_location,
end_of_log_mark=self.end_of_log_mark,
write_stdout=True,
- host=self.host,
- port=self.port,
- username=self.username,
- password=self.password,
+ host="localhost",
+ port=9200,
+ username="admin",
+ password="admin",
json_format=True,
json_fields=self.json_fields,
host_field=self.host_field,
@@ -493,7 +475,7 @@ class TestOpensearchTaskHandler:
)
handler.formatter = logging.Formatter()
- logger = logging.getLogger(__name__)
+ logger = logging.getLogger("tests.opensearch.dynamic_offset")
logger.handlers = [handler]
logger.propagate = False
@@ -501,17 +483,19 @@ class TestOpensearchTaskHandler:
handler.set_context(ti)
t1 = pendulum.local(year=2017, month=1, day=1, hour=1, minute=1,
second=15)
- t2, t3 = t1 + pendulum.duration(seconds=5), t1 +
pendulum.duration(seconds=10)
-
- # act
- time_machine.move_to(t1, tick=False)
- ti.log.info("Test")
- time_machine.move_to(t2, tick=False)
- ti.log.info("Test2")
- time_machine.move_to(t3, tick=False)
- ti.log.info("Test3")
+ t2 = t1 + pendulum.duration(seconds=5)
+ t3 = t1 + pendulum.duration(seconds=10)
+
+ try:
+ time_machine.move_to(t1, tick=False)
+ ti.log.info("Test")
+ time_machine.move_to(t2, tick=False)
+ ti.log.info("Test2")
+ time_machine.move_to(t3, tick=False)
+ ti.log.info("Test3")
+ finally:
+ logger.handlers = []
- # assert
first_log, second_log, third_log = map(json.loads,
stdout_mock.getvalue().strip().splitlines())
assert first_log["offset"] < second_log["offset"] < third_log["offset"]
assert first_log["asctime"] == t1.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
@@ -523,81 +507,212 @@ class TestOpensearchTaskHandler:
mock_callable = Mock(return_value="callable_index_pattern")
mock_import_string.return_value = mock_callable
- self.os_task_handler.index_patterns_callable =
"path.to.index_pattern_callable"
- result = self.os_task_handler._get_index_patterns({})
-
-
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
- mock_callable.assert_called_once_with({})
- assert result == "callable_index_pattern"
-
-
-def test_safe_attrgetter():
- class A: ...
-
- a = A()
- a.b = "b"
- a.c = None
- a.x = a
- a.x.d = "blah"
- assert getattr_nested(a, "b", None) == "b" # regular getattr
- assert getattr_nested(a, "x.d", None) == "blah" # nested val
- assert getattr_nested(a, "aa", "heya") == "heya" # respects non-none
default
- assert getattr_nested(a, "c", "heya") is None # respects none value
- assert getattr_nested(a, "aa", None) is None # respects none default
-
-
-def test_retrieve_config_keys():
- """
- Tests that the OpensearchTaskHandler retrieves the correct configuration
keys from the config file.
- * old_parameters are removed
- * parameters from config are automatically added
- * constructor parameters missing from config are also added
- :return:
- """
- with conf_vars(
- {
- ("opensearch_configs", "http_compress"): "False",
- ("opensearch_configs", "timeout"): "10",
+ self.os_task_handler.io.index_patterns_callable =
"path.to.index_pattern_callable"
+ result = self.os_task_handler.io._get_index_patterns({})
+
+
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
+ mock_callable.assert_called_once_with({})
+ assert result == "callable_index_pattern"
+
+
+class TestTaskHandlerHelpers:
+ def test_safe_attrgetter(self):
+ class A: ...
+
+ a = A()
+ a.b = "b"
+ a.c = None
+ a.x = a
+ a.x.d = "blah"
+ assert getattr_nested(a, "b", None) == "b"
+ assert getattr_nested(a, "x.d", None) == "blah"
+ assert getattr_nested(a, "aa", "heya") == "heya"
+ assert getattr_nested(a, "c", "heya") is None
+ assert getattr_nested(a, "aa", None) is None
+
+ def test_retrieve_config_keys(self):
+ with conf_vars(
+ {
+ ("opensearch_configs", "http_compress"): "False",
+ ("opensearch_configs", "timeout"): "10",
+ }
+ ):
+ args_from_config = get_os_kwargs_from_config().keys()
+ assert "verify_certs" in args_from_config
+ assert "timeout" in args_from_config
+ assert "http_compress" in args_from_config
+ assert "self" not in args_from_config
+
+
+class TestOpensearchRemoteLogIO:
+ @pytest.fixture(autouse=True)
+ def _setup_tests(self, tmp_path):
+ self.opensearch_io = OpensearchRemoteLogIO(
+ write_to_opensearch=True,
+ write_stdout=True,
+ delete_local_copy=True,
+ host="localhost",
+ port=9200,
+ username="admin",
+ password="admin",
+ base_log_folder=tmp_path,
+
log_id_template="{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}",
+ )
+
+ @pytest.fixture
+ def ti(self):
+ return _MockTI()
+
+ @pytest.fixture
+ def tmp_json_file(self, tmp_path):
+ file_path = tmp_path / "1.log"
+ sample_logs = [
+ {"message": "start"},
+ {"message": "processing"},
+ {"message": "end"},
+ ]
+ file_path.write_text("\n".join(json.dumps(log) for log in sample_logs)
+ "\n")
+ return file_path
+
+ def test_write_to_stdout(self, tmp_json_file, ti, capsys):
+ self.opensearch_io.write_to_opensearch = False
+ self.opensearch_io.upload(tmp_json_file, ti)
+
+ captured = capsys.readouterr()
+ stdout_lines = captured.out.strip().splitlines()
+ log_entries = [json.loads(line) for line in stdout_lines]
+ assert [entry["message"] for entry in log_entries] == ["start",
"processing", "end"]
+
+ def test_invalid_task_log_file_path(self, ti):
+ with (
+ patch.object(self.opensearch_io, "_parse_raw_log") as mock_parse,
+ patch.object(self.opensearch_io, "_write_to_opensearch") as
mock_write,
+ ):
+ self.opensearch_io.upload(Path("/invalid/path"), ti)
+
+ mock_parse.assert_not_called()
+ mock_write.assert_not_called()
+
+ def test_write_to_opensearch(self, tmp_json_file, ti):
+ self.opensearch_io.write_stdout = False
+ log_id = _render_log_id(self.opensearch_io.log_id_template, ti,
ti.try_number)
+ expected_log_lines =
self.opensearch_io._parse_raw_log(tmp_json_file.read_text(), log_id)
+
+ with patch.object(self.opensearch_io, "_write_to_opensearch",
return_value=True) as mock_write:
+ self.opensearch_io.upload(tmp_json_file, ti)
+
+ mock_write.assert_called_once_with(expected_log_lines)
+
+ def test_raw_log_contains_log_id_and_offset(self, tmp_json_file, ti):
+ raw_log = tmp_json_file.read_text()
+ log_id = _render_log_id(self.opensearch_io.log_id_template, ti,
ti.try_number)
+ json_log_lines = self.opensearch_io._parse_raw_log(raw_log, log_id)
+
+ assert len(json_log_lines) == 3
+ assert [line["offset"] for line in json_log_lines] == [1, 2, 3]
+ assert all(line["log_id"] == log_id for line in json_log_lines)
+
+ def test_os_read_builds_expected_query(self, ti):
+ self.opensearch_io.client = Mock()
+ self.opensearch_io.client.count.return_value = {"count": 1}
+ self.opensearch_io.client.search.return_value =
_build_os_search_response(
+ {
+ "event": "hello",
+ "log_id": _render_log_id(self.opensearch_io.log_id_template,
ti, ti.try_number),
+ "offset": 3,
+ }
+ )
+ self.opensearch_io.index_patterns = "airflow-logs-*"
+ log_id = _render_log_id(self.opensearch_io.log_id_template, ti,
ti.try_number)
+ query = {
+ "query": {
+ "bool": {
+ "filter": [{"range": {self.opensearch_io.offset_field:
{"gt": 2}}}],
+ "must": [{"match_phrase": {"log_id": log_id}}],
+ }
+ }
}
- ):
- args_from_config = get_os_kwargs_from_config().keys()
- # verify_certs comes from default config value
- assert "verify_certs" in args_from_config
- # timeout comes from config provided value
- assert "timeout" in args_from_config
- # http_compress comes from config value
- assert "http_compress" in args_from_config
- assert "self" not in args_from_config
+ response = self.opensearch_io._os_read(log_id, 2, ti)
-# ---------------------------------------------------------------------------
-# Tests for the error_detail helpers (issue #63736)
-# ---------------------------------------------------------------------------
+
self.opensearch_io.client.count.assert_called_once_with(index="airflow-logs-*",
body=query)
+ self.opensearch_io.client.search.assert_called_once_with(
+ index="airflow-logs-*",
+ body=query,
+ sort=[self.opensearch_io.offset_field],
+ size=self.opensearch_io.MAX_LINE_PER_PAGE,
+ from_=0,
+ )
+ assert response is not None
+ assert response.hits[0].event == "hello"
+ def test_os_read_returns_none_when_count_is_zero(self, ti):
+ self.opensearch_io.client = Mock()
+ self.opensearch_io.client.count.return_value = {"count": 0}
-class TestFormatErrorDetail:
- """Unit tests for _format_error_detail."""
+ log_id = _render_log_id(self.opensearch_io.log_id_template, ti,
ti.try_number)
+ response = self.opensearch_io._os_read(log_id, 0, ti)
- def test_returns_none_for_empty(self):
- from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
+ assert response is None
+ self.opensearch_io.client.search.assert_not_called()
+
+ def test_os_read_propagates_missing_index(self, ti):
+ self.opensearch_io.client = Mock()
+ self.opensearch_io.client.count.side_effect = NotFoundError(
+ 404,
+ "IndexMissingException[[missing] missing]",
+ )
+
+ log_id = _render_log_id(self.opensearch_io.log_id_template, ti,
ti.try_number)
+ with pytest.raises(NotFoundError):
+ self.opensearch_io._os_read(log_id, 0, ti)
+
+ def test_os_read_logs_and_returns_none_on_search_error(self, ti):
+ self.opensearch_io.client = Mock()
+ self.opensearch_io.client.count.return_value = {"count": 1}
+ self.opensearch_io.client.search.side_effect = RuntimeError("boom")
+
+ log_id = _render_log_id(self.opensearch_io.log_id_template, ti,
ti.try_number)
+ with patch.object(self.opensearch_io.log, "exception") as
mock_exception:
+ response = self.opensearch_io._os_read(log_id, 0, ti)
+
+ assert response is None
+ mock_exception.assert_called_once()
+ def test_read_returns_missing_log_message_when_os_read_returns_none(self,
ti):
+ with patch.object(self.opensearch_io, "_os_read", return_value=None):
+ log_source_info, log_messages = self.opensearch_io.read("", ti)
+
+ log_id = _render_log_id(self.opensearch_io.log_id_template, ti,
ti.try_number)
+ assert log_source_info == []
+ assert f"*** Log {log_id} not found in Opensearch" in log_messages[0]
+
+ def test_get_index_patterns_with_callable(self):
+ with
patch("airflow.providers.opensearch.log.os_task_handler.import_string") as
mock_import_string:
+ mock_callable = Mock(return_value="callable_index_pattern")
+ mock_import_string.return_value = mock_callable
+
+ self.opensearch_io.index_patterns_callable =
"path.to.index_pattern_callable"
+ result = self.opensearch_io._get_index_patterns({})
+
+
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
+ mock_callable.assert_called_once_with({})
+ assert result == "callable_index_pattern"
+
+
+class TestFormatErrorDetail:
+ def test_returns_none_for_empty(self):
assert _format_error_detail(None) is None
assert _format_error_detail([]) is None
def test_returns_string_for_non_list(self):
- from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
-
assert _format_error_detail("raw string") == "raw string"
def test_formats_single_exception(self):
- from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
-
error_detail = [
{
"is_cause": False,
- "frames": [
- {"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"},
- ],
+ "frames": [{"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"}],
"exc_type": "RuntimeError",
"exc_value": "Something went wrong.",
"exceptions": [],
@@ -611,8 +726,6 @@ class TestFormatErrorDetail:
assert "RuntimeError: Something went wrong." in result
def test_formats_chained_exceptions(self):
- from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
-
error_detail = [
{
"is_cause": True,
@@ -636,8 +749,6 @@ class TestFormatErrorDetail:
assert "RuntimeError: wrapped" in result
def test_exc_type_without_value(self):
- from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
-
error_detail = [
{
"is_cause": False,
@@ -651,19 +762,13 @@ class TestFormatErrorDetail:
assert result.endswith("StopIteration")
def test_non_dict_items_are_stringified(self):
- from airflow.providers.opensearch.log.os_task_handler import
_format_error_detail
-
result = _format_error_detail(["unexpected string item"])
assert result is not None
assert "unexpected string item" in result
-class TestBuildLogFields:
- """Unit tests for _build_log_fields."""
-
+class TestBuildStructuredLogFields:
def test_filters_to_allowed_fields(self):
- from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
-
hit = {"event": "hello", "level": "info", "unknown_field": "should be
dropped"}
result = _build_log_fields(hit)
assert "event" in result
@@ -671,41 +776,30 @@ class TestBuildLogFields:
assert "unknown_field" not in result
def test_message_mapped_to_event(self):
- from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
-
hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
fields = _build_log_fields(hit)
assert fields["event"] == "plain message"
- assert "message" not in fields # Ensure it is popped if used as event
+ assert "message" not in fields
def test_message_preserved_if_event_exists(self):
- from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
-
hit = {"event": "structured event", "message": "plain message"}
fields = _build_log_fields(hit)
assert fields["event"] == "structured event"
- # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide
with event
assert fields["message"] == "plain message"
def test_levelname_mapped_to_level(self):
- from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
-
hit = {"event": "msg", "levelname": "ERROR"}
result = _build_log_fields(hit)
assert result["level"] == "ERROR"
assert "levelname" not in result
def test_at_timestamp_mapped_to_timestamp(self):
- from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
-
hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
result = _build_log_fields(hit)
assert result["timestamp"] == "2024-01-01T00:00:00Z"
assert "@timestamp" not in result
def test_error_detail_is_kept_as_list(self):
- from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
-
error_detail = [
{
"is_cause": False,
@@ -714,74 +808,11 @@ class TestBuildLogFields:
"exc_value": "Woopsie.",
}
]
- hit = {
- "event": "Task failed with exception",
- "error_detail": error_detail,
- }
+ hit = {"event": "Task failed with exception", "error_detail":
error_detail}
result = _build_log_fields(hit)
assert result["error_detail"] == error_detail
def test_error_detail_dropped_when_empty(self):
- from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
-
hit = {"event": "msg", "error_detail": []}
result = _build_log_fields(hit)
assert "error_detail" not in result
-
- @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage
only exists in Airflow 3+")
- def test_read_includes_error_detail_in_structured_message(self):
- """End-to-end: a hit with error_detail should surface it in the
returned StructuredLogMessage."""
- from airflow.providers.opensearch.log.os_task_handler import
OpensearchTaskHandler
-
- local_log_location = "local/log/location"
- handler = OpensearchTaskHandler(
- base_log_folder=local_log_location,
- end_of_log_mark="end_of_log\n",
- write_stdout=False,
- json_format=False,
- json_fields="asctime,filename,lineno,levelname,message,exc_text",
- host="localhost",
- port=9200,
- username="admin",
- password="password",
- )
-
- log_id = "test_dag-test_task-test_run--1-1"
- body = {
- "event": "Task failed with exception",
- "log_id": log_id,
- "offset": 1,
- "error_detail": [
- {
- "is_cause": False,
- "frames": [
- {"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}
- ],
- "exc_type": "RuntimeError",
- "exc_value": "Woopsie. Something went wrong.",
- }
- ],
- }
-
- # Instead of firing up an OpenSearch client, we patch the IO and
response class
- mock_hit_dict = body.copy()
- from airflow.providers.opensearch.log.os_response import Hit,
OpensearchResponse
-
- mock_hit = Hit({"_source": mock_hit_dict})
- mock_response = mock.MagicMock(spec=OpensearchResponse)
- mock_response.hits = [mock_hit]
- mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
- mock_response.__bool__ = mock.Mock(return_value=True)
- mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
-
- with mock.patch.object(handler, "_os_read",
return_value=mock_response):
- with mock.patch.object(handler, "_group_logs_by_host",
return_value={"localhost": [mock_hit]}):
- from airflow.providers.opensearch.log.os_task_handler import
_build_log_fields
- from airflow.utils.log.file_task_handler import
StructuredLogMessage
-
- fields = _build_log_fields(mock_hit.to_dict())
- msg = StructuredLogMessage(**fields)
-
- assert msg.event == "Task failed with exception"
- assert hasattr(msg, "error_detail")
- assert msg.error_detail == body["error_detail"]