This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9465591c0f6 feat: restore os provider on core library (#64764)
9465591c0f6 is described below
commit 9465591c0f696504488ee78c2e5f5391f28af66b
Author: Owen Leung <[email protected]>
AuthorDate: Fri Apr 10 16:39:50 2026 +0800
feat: restore os provider on core library (#64764)
* add changes for os provider
* bump pyproject.toml provider lib version
* Fix failing CI
---
.../config_templates/airflow_local_settings.py | 44 +++++++++++-----------
pyproject.toml | 4 +-
scripts/ci/prek/update_airflow_pyproject_toml.py | 1 +
3 files changed, 26 insertions(+), 23 deletions(-)
diff --git
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 48f14b0f9a9..0c54c555735 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -334,34 +334,36 @@ if REMOTE_LOGGING:
)
elif OPENSEARCH_HOST:
- OPENSEARCH_END_OF_LOG_MARK: str =
conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK")
- OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT")
+ from airflow.providers.opensearch.log.os_task_handler import
OpensearchRemoteLogIO
+
+ OPENSEARCH_PORT = conf.getint("opensearch", "PORT", fallback=9200)
OPENSEARCH_USERNAME: str = conf.get_mandatory_value("opensearch",
"USERNAME")
OPENSEARCH_PASSWORD: str = conf.get_mandatory_value("opensearch",
"PASSWORD")
OPENSEARCH_WRITE_STDOUT: bool = conf.getboolean("opensearch",
"WRITE_STDOUT")
+ OPENSEARCH_WRITE_TO_OS: bool = conf.getboolean("opensearch",
"WRITE_TO_OS")
OPENSEARCH_JSON_FORMAT: bool = conf.getboolean("opensearch",
"JSON_FORMAT")
- OPENSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("opensearch",
"JSON_FIELDS")
+ OPENSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("opensearch",
"TARGET_INDEX")
OPENSEARCH_HOST_FIELD: str = conf.get_mandatory_value("opensearch",
"HOST_FIELD")
OPENSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("opensearch",
"OFFSET_FIELD")
+ OPENSEARCH_LOG_ID_TEMPLATE: str = conf.get("opensearch",
"LOG_ID_TEMPLATE", fallback="") or (
+ "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+ )
- OPENSEARCH_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
- "task": {
- "class":
"airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler",
- "formatter": "airflow",
- "base_log_folder": BASE_LOG_FOLDER,
- "end_of_log_mark": OPENSEARCH_END_OF_LOG_MARK,
- "host": OPENSEARCH_HOST,
- "port": OPENSEARCH_PORT,
- "username": OPENSEARCH_USERNAME,
- "password": OPENSEARCH_PASSWORD,
- "write_stdout": OPENSEARCH_WRITE_STDOUT,
- "json_format": OPENSEARCH_JSON_FORMAT,
- "json_fields": OPENSEARCH_JSON_FIELDS,
- "host_field": OPENSEARCH_HOST_FIELD,
- "offset_field": OPENSEARCH_OFFSET_FIELD,
- },
- }
- DEFAULT_LOGGING_CONFIG["handlers"].update(OPENSEARCH_REMOTE_HANDLERS)
+ REMOTE_TASK_LOG = OpensearchRemoteLogIO(
+ host=OPENSEARCH_HOST,
+ port=OPENSEARCH_PORT,
+ username=OPENSEARCH_USERNAME,
+ password=OPENSEARCH_PASSWORD,
+ target_index=OPENSEARCH_TARGET_INDEX,
+ write_stdout=OPENSEARCH_WRITE_STDOUT,
+ write_to_opensearch=OPENSEARCH_WRITE_TO_OS,
+ offset_field=OPENSEARCH_OFFSET_FIELD,
+ host_field=OPENSEARCH_HOST_FIELD,
+ base_log_folder=BASE_LOG_FOLDER,
+ delete_local_copy=delete_local_copy,
+ json_format=OPENSEARCH_JSON_FORMAT,
+ log_id_template=OPENSEARCH_LOG_ID_TEMPLATE,
+ )
else:
raise AirflowException(
"Incorrect remote log configuration. Please check the
configuration of option 'host' in "
diff --git a/pyproject.toml b/pyproject.toml
index 3bebe69019c..b7b3a444a41 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -297,7 +297,7 @@ apache-airflow = "airflow.__main__:main"
"apache-airflow-providers-openlineage>=2.3.0" # Set from
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
]
"opensearch" = [
- "apache-airflow-providers-opensearch>=1.5.0"
+ "apache-airflow-providers-opensearch>=1.9.0" # Set from
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
]
"opsgenie" = [
"apache-airflow-providers-opsgenie>=5.8.0"
@@ -464,7 +464,7 @@ apache-airflow = "airflow.__main__:main"
"apache-airflow-providers-openai>=1.5.0",
"apache-airflow-providers-openfaas>=3.7.0",
"apache-airflow-providers-openlineage>=2.3.0", # Set from
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
- "apache-airflow-providers-opensearch>=1.5.0",
+ "apache-airflow-providers-opensearch>=1.9.0", # Set from
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-opsgenie>=5.8.0",
"apache-airflow-providers-oracle>=3.12.0",
"apache-airflow-providers-pagerduty>=3.8.1",
diff --git a/scripts/ci/prek/update_airflow_pyproject_toml.py
b/scripts/ci/prek/update_airflow_pyproject_toml.py
index 2143432dedd..e912f3aee59 100755
--- a/scripts/ci/prek/update_airflow_pyproject_toml.py
+++ b/scripts/ci/prek/update_airflow_pyproject_toml.py
@@ -89,6 +89,7 @@ MIN_VERSION_OVERRIDE: dict[str, Version] = {
"git": parse_version("0.0.2"),
"common.messaging": parse_version("2.0.0"),
"elasticsearch": parse_version("6.5.0"),
+ "opensearch": parse_version("1.9.0"),
}