This is an automated email from the ASF dual-hosted git repository.

feluelle pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c1a3cc9a69 Fix default json encoder serialization in Task SDK logging 
(#45962)
2c1a3cc9a69 is described below

commit 2c1a3cc9a69e6962fa4cd7e05b38b1fbe07b33b2
Author: Ian Buss <[email protected]>
AuthorDate: Wed Jan 29 09:59:43 2025 +0000

    Fix default json encoder serialization in Task SDK logging (#45962)
    
    * Use orjson for json serialization in Task SDK logging
    
    * Formatting fixes
    
    * Use msgspec enc_hook
    
    * Sort pyproject dependencies
---
 task_sdk/pyproject.toml         |  2 +-
 task_sdk/src/airflow/sdk/log.py |  6 ++---
 task_sdk/tests/conftest.py      | 36 ++++++++++++++++----------
 task_sdk/tests/test_log.py      | 56 +++++++++++++++++++++++++++++++++++++++++
 4 files changed, 82 insertions(+), 18 deletions(-)

diff --git a/task_sdk/pyproject.toml b/task_sdk/pyproject.toml
index e2eefdd5fcf..06b3a940406 100644
--- a/task_sdk/pyproject.toml
+++ b/task_sdk/pyproject.toml
@@ -27,7 +27,7 @@ dependencies = [
     "httpx>=0.27.0",
     "jinja2>=3.1.4",
     "methodtools>=0.4.7",
-    "msgspec>=0.18.6",
+    "msgspec>=0.19.0",
     "psutil>=6.1.0",
     "structlog>=24.4.0",
     "retryhttp>=1.2.0",
diff --git a/task_sdk/src/airflow/sdk/log.py b/task_sdk/src/airflow/sdk/log.py
index 3290d0ff7f3..fa5b113588b 100644
--- a/task_sdk/src/airflow/sdk/log.py
+++ b/task_sdk/src/airflow/sdk/log.py
@@ -196,13 +196,11 @@ def logging_processors(
         else:
             exc_group_processor = None
 
-        encoder = msgspec.json.Encoder()
-
         def json_dumps(msg, default):
-            return encoder.encode(msg)
+            return msgspec.json.encode(msg, enc_hook=default)
 
         def json_processor(logger: Any, method_name: Any, event_dict: 
EventDict) -> str:
-            return encoder.encode(event_dict).decode("utf-8")
+            return msgspec.json.encode(event_dict).decode("utf-8")
 
         json = structlog.processors.JSONRenderer(serializer=json_dumps)
 
diff --git a/task_sdk/tests/conftest.py b/task_sdk/tests/conftest.py
index 3fc7fc18015..e24f6e397d3 100644
--- a/task_sdk/tests/conftest.py
+++ b/task_sdk/tests/conftest.py
@@ -62,18 +62,19 @@ def pytest_runtest_setup(item):
 
 class LogCapture:
     # Like structlog.typing.LogCapture, but that doesn't add log_level in to 
the event dict
-    entries: list[EventDict]
+    entries: list[EventDict | bytes]
 
     def __init__(self) -> None:
         self.entries = []
 
-    def __call__(self, _: WrappedLogger, method_name: str, event_dict: 
EventDict) -> NoReturn:
+    def __call__(self, _: WrappedLogger, method_name: str, event: EventDict | 
bytes) -> NoReturn:
         from structlog.exceptions import DropEvent
 
-        if "level" not in event_dict:
-            event_dict["_log_level"] = method_name
+        if isinstance(event, dict):
+            if "level" not in event:
+                event["_log_level"] = method_name
 
-        self.entries.append(event_dict)
+        self.entries.append(event)
 
         raise DropEvent
 
@@ -93,20 +94,29 @@ def captured_logs(request):
     reset_logging()
     configure_logging(enable_pretty_log=False)
 
-    # Get log level from test parameter, defaulting to INFO if not provided
-    log_level = getattr(request, "param", logging.INFO)
+    # Get log level from test parameter, which can either be a single log 
level or a
+    # tuple of log level and desired output type, defaulting to INFO if not 
provided
+    log_level = logging.INFO
+    output = "dict"
+    param = getattr(request, "param", logging.INFO)
+    if isinstance(param, int):
+        log_level = param
+    elif isinstance(param, tuple):
+        log_level = param[0]
+        output = param[1]
 
     # We want to capture all logs, but we don't want to see them in the test 
output
     
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(log_level))
 
-    # But we need to replace remove the last processor (the one that turns 
JSON into text, as we want the
-    # event dict for tests)
     cur_processors = structlog.get_config()["processors"]
     processors = cur_processors.copy()
-    proc = processors.pop()
-    assert isinstance(
-        proc, (structlog.dev.ConsoleRenderer, 
structlog.processors.JSONRenderer)
-    ), "Pre-condition"
+    if output == "dict":
+        # We need to replace remove the last processor (the one that turns 
JSON into text, as we want the
+        # event dict for tests)
+        proc = processors.pop()
+        assert isinstance(
+            proc, (structlog.dev.ConsoleRenderer, 
structlog.processors.JSONRenderer)
+        ), "Pre-condition"
     try:
         cap = LogCapture()
         processors.append(cap)
diff --git a/task_sdk/tests/test_log.py b/task_sdk/tests/test_log.py
new file mode 100644
index 00000000000..bf00f33e9a7
--- /dev/null
+++ b/task_sdk/tests/test_log.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+import unittest.mock
+
+import pytest
+import structlog
+from uuid6 import UUID
+
+from airflow.sdk.api.datamodels._generated import TaskInstance
+
+
[email protected](
+    "captured_logs", [(logging.INFO, "json")], indirect=True, 
ids=["log_level=info,formatter=json"]
+)
+def test_json_rendering(captured_logs):
+    """
+    Test that the JSON formatter renders correctly.
+    """
+    logger = structlog.get_logger()
+    logger.info(
+        "A test message with a Pydantic class",
+        pydantic_class=TaskInstance(
+            id=UUID("ffec3c8e-2898-46f8-b7d5-3cc571577368"),
+            dag_id="test_dag",
+            task_id="test_task",
+            run_id="test_run",
+            try_number=1,
+        ),
+    )
+    assert captured_logs
+    assert isinstance(captured_logs[0], bytes)
+    assert json.loads(captured_logs[0]) == {
+        "event": "A test message with a Pydantic class",
+        "pydantic_class": 
"TaskInstance(id=UUID('ffec3c8e-2898-46f8-b7d5-3cc571577368'), 
task_id='test_task', dag_id='test_dag', run_id='test_run', try_number=1, 
map_index=-1, hostname=None)",
+        "timestamp": unittest.mock.ANY,
+        "level": "info",
+    }

Reply via email to