This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new e64b7e41d72 [v3-2-test] Fix: Restore live stdout logging for 
Elasticsearch log forwarding (#64067) (#64592)
e64b7e41d72 is described below

commit e64b7e41d72e5bad9ff7d90b8d10797f0a9a0756
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Apr 2 02:42:46 2026 +0300

    [v3-2-test] Fix: Restore live stdout logging for Elasticsearch log 
forwarding (#64067) (#64592)
    
    This restores the ElasticsearchTaskHandler in airflow_local_settings.py and 
ensures LocalExecutor forwards task logs to stdout by passing 
subprocess_logs_to_stdout=True to the supervisor.
    (cherry picked from commit f1495c283fc9887678fcd467a9063e5e7bfe7e33)
    
    
    closes: #63960
    closes: #49863
    closes: #54501
    
    Co-authored-by: Subham <[email protected]>
    Co-authored-by: Rahul Vats <[email protected]>
---
 airflow-core/newsfragments/64067.bugfix.rst         |  1 +
 .../config_templates/airflow_local_settings.py      | 21 +++++++++++++++++++++
 .../src/airflow/executors/local_executor.py         |  1 +
 .../tests/unit/executors/test_local_executor.py     |  7 ++++---
 4 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/airflow-core/newsfragments/64067.bugfix.rst 
b/airflow-core/newsfragments/64067.bugfix.rst
new file mode 100644
index 00000000000..8ae9a97f544
--- /dev/null
+++ b/airflow-core/newsfragments/64067.bugfix.rst
@@ -0,0 +1 @@
+Restore live stdout logging for Elasticsearch in Airflow 3 by correctly 
configuring the handler in ``airflow_local_settings.py`` and forwarding task 
logs to stdout in ``LocalExecutor``.
diff --git 
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py 
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 06639e0e855..17c7cd47d54 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -291,6 +291,27 @@ if REMOTE_LOGGING:
         ELASTICSEARCH_HOST_FIELD: str = 
conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
         ELASTICSEARCH_OFFSET_FIELD: str = 
conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
         ELASTICSEARCH_LOG_ID_TEMPLATE: str = 
conf.get_mandatory_value("elasticsearch", "LOG_ID_TEMPLATE")
+        ELASTICSEARCH_END_OF_LOG_MARK: str = 
conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
+        ELASTICSEARCH_FRONTEND: str = 
conf.get_mandatory_value("elasticsearch", "FRONTEND")
+        ELASTICSEARCH_JSON_FIELDS: str = 
conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
+
+        ELASTICSEARCH_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] 
= {
+            "task": {
+                "class": 
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
+                "formatter": "airflow",
+                "base_log_folder": BASE_LOG_FOLDER,
+                "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
+                "host": ELASTICSEARCH_HOST,
+                "frontend": ELASTICSEARCH_FRONTEND,
+                "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
+                "write_to_es": ELASTICSEARCH_WRITE_TO_ES,
+                "json_format": ELASTICSEARCH_JSON_FORMAT,
+                "json_fields": ELASTICSEARCH_JSON_FIELDS,
+                "host_field": ELASTICSEARCH_HOST_FIELD,
+                "offset_field": ELASTICSEARCH_OFFSET_FIELD,
+            },
+        }
+        
DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTICSEARCH_REMOTE_HANDLERS)
 
         REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(
             host=ELASTICSEARCH_HOST,
diff --git a/airflow-core/src/airflow/executors/local_executor.py 
b/airflow-core/src/airflow/executors/local_executor.py
index 9b5939a0bd2..5703936e1d4 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -149,6 +149,7 @@ def _execute_work(log: Logger, workload: 
workloads.ExecuteTask, team_conf) -> No
         token=workload.token,
         server=team_conf.get("core", "execution_api_server_url", 
fallback=default_execution_api_server),
         log_path=workload.log_path,
+        subprocess_logs_to_stdout=True,
     )
 
 
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py 
b/airflow-core/tests/unit/executors/test_local_executor.py
index 59afffe6833..af6507d2642 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -268,7 +268,7 @@ class TestLocalExecutor:
 
         with conf_vars(conf_values):
             team_conf = ExecutorConf(team_name=None)
-            _execute_work(log=mock.ANY, workload=mock.MagicMock(), 
team_conf=team_conf)
+            _execute_work(log=mock.MagicMock(), workload=mock.MagicMock(), 
team_conf=team_conf)
 
             mock_supervise.assert_called_with(
                 ti=mock.ANY,
@@ -277,6 +277,7 @@ class TestLocalExecutor:
                 token=mock.ANY,
                 server=expected_server,
                 log_path=mock.ANY,
+                subprocess_logs_to_stdout=True,
             )
 
     @mock.patch("airflow.sdk.execution_time.supervisor.supervise")
@@ -303,7 +304,7 @@ class TestLocalExecutor:
             with conf_vars(config_overrides):
                 # Test team-specific config
                 team_conf = ExecutorConf(team_name=team_name)
-                _execute_work(log=mock.ANY, workload=mock.MagicMock(), 
team_conf=team_conf)
+                _execute_work(log=mock.MagicMock(), workload=mock.MagicMock(), 
team_conf=team_conf)
 
                 # Verify team-specific server URL was used
                 assert mock_supervise.call_count == 1
@@ -314,7 +315,7 @@ class TestLocalExecutor:
 
                 # Test global config (no team)
                 global_conf = ExecutorConf(team_name=None)
-                _execute_work(log=mock.ANY, workload=mock.MagicMock(), 
team_conf=global_conf)
+                _execute_work(log=mock.MagicMock(), workload=mock.MagicMock(), 
team_conf=global_conf)
 
                 # Verify default server URL was used
                 assert mock_supervise.call_count == 1

Reply via email to