This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new e64b7e41d72 [v3-2-test] Fix: Restore live stdout logging for
Elasticsearch log forwarding (#64067) (#64592)
e64b7e41d72 is described below
commit e64b7e41d72e5bad9ff7d90b8d10797f0a9a0756
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Apr 2 02:42:46 2026 +0300
[v3-2-test] Fix: Restore live stdout logging for Elasticsearch log
forwarding (#64067) (#64592)
This restores the ElasticsearchTaskHandler in airflow_local_settings.py and
ensures LocalExecutor forwards task logs to stdout by passing
subprocess_logs_to_stdout=True to the supervisor.
(cherry picked from commit f1495c283fc9887678fcd467a9063e5e7bfe7e33)
closes: #63960
closes: #49863
closes: #54501
Co-authored-by: Subham <[email protected]>
Co-authored-by: Rahul Vats <[email protected]>
---
airflow-core/newsfragments/64067.bugfix.rst | 1 +
.../config_templates/airflow_local_settings.py | 21 +++++++++++++++++++++
.../src/airflow/executors/local_executor.py | 1 +
.../tests/unit/executors/test_local_executor.py | 7 ++++---
4 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/airflow-core/newsfragments/64067.bugfix.rst
b/airflow-core/newsfragments/64067.bugfix.rst
new file mode 100644
index 00000000000..8ae9a97f544
--- /dev/null
+++ b/airflow-core/newsfragments/64067.bugfix.rst
@@ -0,0 +1 @@
+Restore live stdout logging for Elasticsearch in Airflow 3 by correctly
configuring the handler in ``airflow_local_settings.py`` and forwarding task
logs to stdout in ``LocalExecutor``.
diff --git
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 06639e0e855..17c7cd47d54 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -291,6 +291,27 @@ if REMOTE_LOGGING:
ELASTICSEARCH_HOST_FIELD: str =
conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
ELASTICSEARCH_OFFSET_FIELD: str =
conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
ELASTICSEARCH_LOG_ID_TEMPLATE: str =
conf.get_mandatory_value("elasticsearch", "LOG_ID_TEMPLATE")
+ ELASTICSEARCH_END_OF_LOG_MARK: str =
conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
+ ELASTICSEARCH_FRONTEND: str =
conf.get_mandatory_value("elasticsearch", "FRONTEND")
+ ELASTICSEARCH_JSON_FIELDS: str =
conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
+
+ ELASTICSEARCH_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]]
= {
+ "task": {
+ "class":
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
+ "formatter": "airflow",
+ "base_log_folder": BASE_LOG_FOLDER,
+ "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
+ "host": ELASTICSEARCH_HOST,
+ "frontend": ELASTICSEARCH_FRONTEND,
+ "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
+ "write_to_es": ELASTICSEARCH_WRITE_TO_ES,
+ "json_format": ELASTICSEARCH_JSON_FORMAT,
+ "json_fields": ELASTICSEARCH_JSON_FIELDS,
+ "host_field": ELASTICSEARCH_HOST_FIELD,
+ "offset_field": ELASTICSEARCH_OFFSET_FIELD,
+ },
+ }
+
DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTICSEARCH_REMOTE_HANDLERS)
REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(
host=ELASTICSEARCH_HOST,
diff --git a/airflow-core/src/airflow/executors/local_executor.py
b/airflow-core/src/airflow/executors/local_executor.py
index 9b5939a0bd2..5703936e1d4 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -149,6 +149,7 @@ def _execute_work(log: Logger, workload:
workloads.ExecuteTask, team_conf) -> No
token=workload.token,
server=team_conf.get("core", "execution_api_server_url",
fallback=default_execution_api_server),
log_path=workload.log_path,
+ subprocess_logs_to_stdout=True,
)
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py
b/airflow-core/tests/unit/executors/test_local_executor.py
index 59afffe6833..af6507d2642 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -268,7 +268,7 @@ class TestLocalExecutor:
with conf_vars(conf_values):
team_conf = ExecutorConf(team_name=None)
- _execute_work(log=mock.ANY, workload=mock.MagicMock(),
team_conf=team_conf)
+ _execute_work(log=mock.MagicMock(), workload=mock.MagicMock(),
team_conf=team_conf)
mock_supervise.assert_called_with(
ti=mock.ANY,
@@ -277,6 +277,7 @@ class TestLocalExecutor:
token=mock.ANY,
server=expected_server,
log_path=mock.ANY,
+ subprocess_logs_to_stdout=True,
)
@mock.patch("airflow.sdk.execution_time.supervisor.supervise")
@@ -303,7 +304,7 @@ class TestLocalExecutor:
with conf_vars(config_overrides):
# Test team-specific config
team_conf = ExecutorConf(team_name=team_name)
- _execute_work(log=mock.ANY, workload=mock.MagicMock(),
team_conf=team_conf)
+ _execute_work(log=mock.MagicMock(), workload=mock.MagicMock(),
team_conf=team_conf)
# Verify team-specific server URL was used
assert mock_supervise.call_count == 1
@@ -314,7 +315,7 @@ class TestLocalExecutor:
# Test global config (no team)
global_conf = ExecutorConf(team_name=None)
- _execute_work(log=mock.ANY, workload=mock.MagicMock(),
team_conf=global_conf)
+ _execute_work(log=mock.MagicMock(), workload=mock.MagicMock(),
team_conf=global_conf)
# Verify default server URL was used
assert mock_supervise.call_count == 1