This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9465591c0f6 feat: restore os provider on core library (#64764)
9465591c0f6 is described below

commit 9465591c0f696504488ee78c2e5f5391f28af66b
Author: Owen Leung <[email protected]>
AuthorDate: Fri Apr 10 16:39:50 2026 +0800

    feat: restore os provider on core library (#64764)
    
    * add changes for os provider
    
    * bump pyproject.toml provider lib version
    
    * Fix failing CI
---
 .../config_templates/airflow_local_settings.py     | 44 +++++++++++-----------
 pyproject.toml                                     |  4 +-
 scripts/ci/prek/update_airflow_pyproject_toml.py   |  1 +
 3 files changed, 26 insertions(+), 23 deletions(-)

diff --git 
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py 
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 48f14b0f9a9..0c54c555735 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -334,34 +334,36 @@ if REMOTE_LOGGING:
         )
 
     elif OPENSEARCH_HOST:
-        OPENSEARCH_END_OF_LOG_MARK: str = 
conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK")
-        OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT")
+        from airflow.providers.opensearch.log.os_task_handler import 
OpensearchRemoteLogIO
+
+        OPENSEARCH_PORT = conf.getint("opensearch", "PORT", fallback=9200)
         OPENSEARCH_USERNAME: str = conf.get_mandatory_value("opensearch", 
"USERNAME")
         OPENSEARCH_PASSWORD: str = conf.get_mandatory_value("opensearch", 
"PASSWORD")
         OPENSEARCH_WRITE_STDOUT: bool = conf.getboolean("opensearch", 
"WRITE_STDOUT")
+        OPENSEARCH_WRITE_TO_OS: bool = conf.getboolean("opensearch", 
"WRITE_TO_OS")
         OPENSEARCH_JSON_FORMAT: bool = conf.getboolean("opensearch", 
"JSON_FORMAT")
-        OPENSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("opensearch", 
"JSON_FIELDS")
+        OPENSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("opensearch", 
"TARGET_INDEX")
         OPENSEARCH_HOST_FIELD: str = conf.get_mandatory_value("opensearch", 
"HOST_FIELD")
         OPENSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("opensearch", 
"OFFSET_FIELD")
+        OPENSEARCH_LOG_ID_TEMPLATE: str = conf.get("opensearch", 
"LOG_ID_TEMPLATE", fallback="") or (
+            "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+        )
 
-        OPENSEARCH_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
-            "task": {
-                "class": 
"airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler",
-                "formatter": "airflow",
-                "base_log_folder": BASE_LOG_FOLDER,
-                "end_of_log_mark": OPENSEARCH_END_OF_LOG_MARK,
-                "host": OPENSEARCH_HOST,
-                "port": OPENSEARCH_PORT,
-                "username": OPENSEARCH_USERNAME,
-                "password": OPENSEARCH_PASSWORD,
-                "write_stdout": OPENSEARCH_WRITE_STDOUT,
-                "json_format": OPENSEARCH_JSON_FORMAT,
-                "json_fields": OPENSEARCH_JSON_FIELDS,
-                "host_field": OPENSEARCH_HOST_FIELD,
-                "offset_field": OPENSEARCH_OFFSET_FIELD,
-            },
-        }
-        DEFAULT_LOGGING_CONFIG["handlers"].update(OPENSEARCH_REMOTE_HANDLERS)
+        REMOTE_TASK_LOG = OpensearchRemoteLogIO(
+            host=OPENSEARCH_HOST,
+            port=OPENSEARCH_PORT,
+            username=OPENSEARCH_USERNAME,
+            password=OPENSEARCH_PASSWORD,
+            target_index=OPENSEARCH_TARGET_INDEX,
+            write_stdout=OPENSEARCH_WRITE_STDOUT,
+            write_to_opensearch=OPENSEARCH_WRITE_TO_OS,
+            offset_field=OPENSEARCH_OFFSET_FIELD,
+            host_field=OPENSEARCH_HOST_FIELD,
+            base_log_folder=BASE_LOG_FOLDER,
+            delete_local_copy=delete_local_copy,
+            json_format=OPENSEARCH_JSON_FORMAT,
+            log_id_template=OPENSEARCH_LOG_ID_TEMPLATE,
+        )
     else:
         raise AirflowException(
             "Incorrect remote log configuration. Please check the 
configuration of option 'host' in "
diff --git a/pyproject.toml b/pyproject.toml
index 3bebe69019c..b7b3a444a41 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -297,7 +297,7 @@ apache-airflow = "airflow.__main__:main"
     "apache-airflow-providers-openlineage>=2.3.0" # Set from 
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
 ]
 "opensearch" = [
-    "apache-airflow-providers-opensearch>=1.5.0"
+    "apache-airflow-providers-opensearch>=1.9.0" # Set from 
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
 ]
 "opsgenie" = [
     "apache-airflow-providers-opsgenie>=5.8.0"
@@ -464,7 +464,7 @@ apache-airflow = "airflow.__main__:main"
     "apache-airflow-providers-openai>=1.5.0",
     "apache-airflow-providers-openfaas>=3.7.0",
     "apache-airflow-providers-openlineage>=2.3.0", # Set from 
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
-    "apache-airflow-providers-opensearch>=1.5.0",
+    "apache-airflow-providers-opensearch>=1.9.0", # Set from 
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
     "apache-airflow-providers-opsgenie>=5.8.0",
     "apache-airflow-providers-oracle>=3.12.0",
     "apache-airflow-providers-pagerduty>=3.8.1",
diff --git a/scripts/ci/prek/update_airflow_pyproject_toml.py 
b/scripts/ci/prek/update_airflow_pyproject_toml.py
index 2143432dedd..e912f3aee59 100755
--- a/scripts/ci/prek/update_airflow_pyproject_toml.py
+++ b/scripts/ci/prek/update_airflow_pyproject_toml.py
@@ -89,6 +89,7 @@ MIN_VERSION_OVERRIDE: dict[str, Version] = {
     "git": parse_version("0.0.2"),
     "common.messaging": parse_version("2.0.0"),
     "elasticsearch": parse_version("6.5.0"),
+    "opensearch": parse_version("1.9.0"),
 }
 
 

Reply via email to