This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ed72b480c5 Refactor xcom API to use shared serialisation constants 
(#64148)
0ed72b480c5 is described below

commit 0ed72b480c572f5dab81084530b79fb89589dce2
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Mar 27 13:51:06 2026 +0530

    Refactor xcom API to use shared serialisation constants (#64148)
---
 airflow-core/pyproject.toml                        |  2 +
 airflow-core/src/airflow/_shared/serialization     |  1 +
 .../api_fastapi/core_api/datamodels/xcom.py        | 23 ++++++++++
 .../core_api/routes/public/test_xcom.py            | 19 ++++++++
 pyproject.toml                                     |  3 ++
 shared/serialization/.gitignore                    |  1 +
 shared/serialization/pyproject.toml                | 53 ++++++++++++++++++++++
 .../src/airflow_shared/serialization/__init__.py   | 43 ++++++++++++++++++
 shared/serialization/tests/conftest.py             | 21 +++++++++
 .../serialization/tests/serialization/__init__.py  | 16 +++++++
 .../tests/serialization/test_constants.py          | 22 +++++++++
 task-sdk/pyproject.toml                            |  2 +
 task-sdk/src/airflow/sdk/_shared/serialization     |  1 +
 task-sdk/src/airflow/sdk/serde/__init__.py         | 19 ++++----
 uv.lock                                            | 22 ++++++++-
 15 files changed, 236 insertions(+), 12 deletions(-)

diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml
index d2e586e58bb..14e7bf266c4 100644
--- a/airflow-core/pyproject.toml
+++ b/airflow-core/pyproject.toml
@@ -248,6 +248,7 @@ exclude = [
 "../shared/observability/src/airflow_shared/observability" = 
"src/airflow/_shared/observability"
 "../shared/secrets_backend/src/airflow_shared/secrets_backend" = 
"src/airflow/_shared/secrets_backend"
 "../shared/secrets_masker/src/airflow_shared/secrets_masker" = 
"src/airflow/_shared/secrets_masker"
+"../shared/serialization/src/airflow_shared/serialization" = 
"src/airflow/_shared/serialization"
 "../shared/timezones/src/airflow_shared/timezones" = 
"src/airflow/_shared/timezones"
 "../shared/listeners/src/airflow_shared/listeners" = 
"src/airflow/_shared/listeners"
 "../shared/plugins_manager/src/airflow_shared/plugins_manager" = 
"src/airflow/_shared/plugins_manager"
@@ -330,6 +331,7 @@ shared_distributions = [
     "apache-airflow-shared-observability",
     "apache-airflow-shared-secrets-backend",
     "apache-airflow-shared-secrets-masker",
+    "apache-airflow-shared-serialization",
     "apache-airflow-shared-timezones",
     "apache-airflow-shared-plugins-manager",
     "apache-airflow-shared-providers-discovery",
diff --git a/airflow-core/src/airflow/_shared/serialization 
b/airflow-core/src/airflow/_shared/serialization
new file mode 120000
index 00000000000..b730c8e6b47
--- /dev/null
+++ b/airflow-core/src/airflow/_shared/serialization
@@ -0,0 +1 @@
+../../../../shared/serialization/src/airflow_shared/serialization
\ No newline at end of file
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
index ad18de4be34..05cbb47c36c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
@@ -90,6 +90,29 @@ class XComCreateBody(StrictBaseModel):
     value: Any
     map_index: int = -1
 
+    @field_validator("value")
+    @classmethod
+    def _check_forbidden_keys(cls, value: Any) -> Any:
+        """Recursively check for forbidden deserialization keys in 
user-provided XCom data."""
+        from airflow._shared.serialization import FORBIDDEN_XCOM_KEYS
+
+        def _walk_forbidden_keys(obj: Any, path: str = "value") -> None:
+            if isinstance(obj, dict):
+                found = FORBIDDEN_XCOM_KEYS & obj.keys()
+                if found:
+                    raise ValueError(
+                        f"XCom {path} contains reserved serialization keys: 
{', '.join(sorted(found))}. "
+                        f"These keys are reserved for internal use."
+                    )
+                for k, v in obj.items():
+                    _walk_forbidden_keys(v, f"{path}.{k}")
+            elif isinstance(obj, (list, tuple)):
+                for i, item in enumerate(obj):
+                    _walk_forbidden_keys(item, f"{path}[{i}]")
+
+        _walk_forbidden_keys(value)
+        return value
+
 
 class XComUpdateBody(StrictBaseModel):
     """Payload serializer for updating an XCom entry."""
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
index b8d06eae613..c51469d944e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
@@ -718,6 +718,25 @@ class TestCreateXComEntry(TestXComEndpoint):
         assert get_resp.json()["key"] == slash_key
         assert get_resp.json()["value"] == json.dumps(TEST_XCOM_VALUE)
 
+    @pytest.mark.parametrize(
+        ("key", "value"),
+        [
+            ("__classname__", {"__classname__": 
"airflow.sdk.definitions.connection.Connection"}),
+            ("__type", {"__type": 
"airflow.sdk.definitions.connection.Connection", "__var": {}}),
+            ("__data__", {"nested": {"__data__": "malicious"}}),
+        ],
+    )
+    def test_create_xcom_entry_blocks_forbidden_keys(self, test_client, key, 
value):
+        """Test that XCom creation blocks deserialization metadata keys."""
+        response = test_client.post(
+            
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries",
+            json={"key": "test_key", "value": value, "map_index": -1},
+        )
+        assert response.status_code == 422
+        detail = str(response.json()["detail"])
+        assert "reserved serialization keys" in detail
+        assert key in detail
+
 
 class TestDeleteXComEntry(TestXComEndpoint):
     def test_delete_xcom_entry(self, test_client, session):
diff --git a/pyproject.toml b/pyproject.toml
index 4adfd24cb51..79938c84a0e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1309,6 +1309,7 @@ dev = [
     "apache-airflow-shared-providers-discovery",
     "apache-airflow-shared-secrets-backend",
     "apache-airflow-shared-secrets-masker",
+    "apache-airflow-shared-serialization",
     "apache-airflow-shared-template-rendering",
     "apache-airflow-shared-timezones",
 ]
@@ -1371,6 +1372,7 @@ apache-airflow-shared-plugins-manager = { workspace = 
true }
 apache-airflow-shared-providers-discovery = { workspace = true }
 apache-airflow-shared-secrets-backend = { workspace = true }
 apache-airflow-shared-secrets-masker = { workspace = true }
+apache-airflow-shared-serialization = { workspace = true }
 apache-airflow-shared-template-rendering = { workspace = true }
 apache-airflow-shared-timezones = { workspace = true }
 # Automatically generated provider workspace items 
(update_airflow_pyproject_toml.py)
@@ -1503,6 +1505,7 @@ members = [
     "shared/providers_discovery",
     "shared/secrets_backend",
     "shared/secrets_masker",
+    "shared/serialization",
     "shared/template_rendering",
     "shared/timezones",
     # Automatically generated provider workspace members 
(update_airflow_pyproject_toml.py)
diff --git a/shared/serialization/.gitignore b/shared/serialization/.gitignore
new file mode 100644
index 00000000000..bff2d762960
--- /dev/null
+++ b/shared/serialization/.gitignore
@@ -0,0 +1 @@
+*.iml
diff --git a/shared/serialization/pyproject.toml 
b/shared/serialization/pyproject.toml
new file mode 100644
index 00000000000..343bb1c34b1
--- /dev/null
+++ b/shared/serialization/pyproject.toml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[project]
+name = "apache-airflow-shared-serialization"
+description = "Shared serialization constants for Airflow distributions"
+version = "0.0"
+classifiers = [
+    "Private :: Do Not Upload",
+]
+
+dependencies = []
+
+[dependency-groups]
+dev = [
+    "apache-airflow-devel-common",
+]
+
+[build-system]
+requires = [
+    "hatchling==1.29.0",
+    "packaging==26.0",
+    "pathspec==1.0.4",
+    "pluggy==1.6.0",
+    "tomli==2.4.0; python_version < '3.11'",
+    "trove-classifiers==2026.1.14.14",
+]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/airflow_shared"]
+
+[tool.ruff]
+extend = "../../pyproject.toml"
+src = ["src"]
+
+[tool.ruff.lint.per-file-ignores]
+# Ignore Doc rules et al for anything outside of tests
+"!src/*" = ["D", "S101", "TRY002"]
diff --git a/shared/serialization/src/airflow_shared/serialization/__init__.py 
b/shared/serialization/src/airflow_shared/serialization/__init__.py
new file mode 100644
index 00000000000..a326d330ee3
--- /dev/null
+++ b/shared/serialization/src/airflow_shared/serialization/__init__.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+# Current serialization format keys
+CLASSNAME = "__classname__"
+VERSION = "__version__"
+DATA = "__data__"
+SCHEMA_ID = "__id__"
+CACHE = "__cache__"
+
+# Legacy serialization format keys (processed by serde for backwards 
compatibility)
+OLD_TYPE = "__type"
+OLD_SOURCE = "__source"
+OLD_DATA = "__var"
+OLD_DICT = "dict"
+
+FORBIDDEN_XCOM_KEYS = frozenset(
+    {
+        CLASSNAME,
+        VERSION,
+        DATA,
+        SCHEMA_ID,
+        CACHE,
+        OLD_TYPE,
+        OLD_SOURCE,
+        OLD_DATA,
+    }
+)
diff --git a/shared/serialization/tests/conftest.py 
b/shared/serialization/tests/conftest.py
new file mode 100644
index 00000000000..93aecf26184
--- /dev/null
+++ b/shared/serialization/tests/conftest.py
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+
+os.environ["_AIRFLOW__AS_LIBRARY"] = "true"
diff --git a/shared/serialization/tests/serialization/__init__.py 
b/shared/serialization/tests/serialization/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/shared/serialization/tests/serialization/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/shared/serialization/tests/serialization/test_constants.py 
b/shared/serialization/tests/serialization/test_constants.py
new file mode 100644
index 00000000000..f3a24e4af08
--- /dev/null
+++ b/shared/serialization/tests/serialization/test_constants.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+
+def test_placeholder():
+    """Placeholder test because this module doesn't have any logic, just 
constants."""
+    pass
diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml
index 627c506f258..df06f6118c7 100644
--- a/task-sdk/pyproject.toml
+++ b/task-sdk/pyproject.toml
@@ -139,6 +139,7 @@ path = "src/airflow/sdk/__init__.py"
 "../shared/observability/src/airflow_shared/observability" = 
"src/airflow/sdk/_shared/observability"
 "../shared/secrets_backend/src/airflow_shared/secrets_backend" = 
"src/airflow/sdk/_shared/secrets_backend"
 "../shared/secrets_masker/src/airflow_shared/secrets_masker" = 
"src/airflow/sdk/_shared/secrets_masker"
+"../shared/serialization/src/airflow_shared/serialization" = 
"src/airflow/sdk/_shared/serialization"
 "../shared/timezones/src/airflow_shared/timezones" = 
"src/airflow/sdk/_shared/timezones"
 "../shared/listeners/src/airflow_shared/listeners" = 
"src/airflow/sdk/_shared/listeners"
 "../shared/plugins_manager/src/airflow_shared/plugins_manager" = 
"src/airflow/sdk/_shared/plugins_manager"
@@ -307,6 +308,7 @@ shared_distributions = [
     "apache-airflow-shared-module-loading",
     "apache-airflow-shared-secrets-backend",
     "apache-airflow-shared-secrets-masker",
+    "apache-airflow-shared-serialization",
     "apache-airflow-shared-timezones",
     "apache-airflow-shared-observability",
     "apache-airflow-shared-plugins-manager",
diff --git a/task-sdk/src/airflow/sdk/_shared/serialization 
b/task-sdk/src/airflow/sdk/_shared/serialization
new file mode 120000
index 00000000000..61b7dfc4b60
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/_shared/serialization
@@ -0,0 +1 @@
+../../../../../shared/serialization/src/airflow_shared/serialization
\ No newline at end of file
diff --git a/task-sdk/src/airflow/sdk/serde/__init__.py 
b/task-sdk/src/airflow/sdk/serde/__init__.py
index c9536fd174c..9a216d2f11c 100644
--- a/task-sdk/src/airflow/sdk/serde/__init__.py
+++ b/task-sdk/src/airflow/sdk/serde/__init__.py
@@ -31,6 +31,15 @@ import attr
 
 from airflow.sdk._shared.module_loading import import_string, iter_namespace, 
qualname
 from airflow.sdk._shared.observability.metrics.stats import Stats
+from airflow.sdk._shared.serialization import (
+    CLASSNAME,
+    DATA,
+    OLD_DATA,
+    OLD_DICT,
+    OLD_TYPE,
+    SCHEMA_ID,
+    VERSION,
+)
 from airflow.sdk.configuration import conf
 from airflow.sdk.observability.metrics import stats_utils
 from airflow.sdk.serde.typing import is_pydantic_model
@@ -42,16 +51,6 @@ log = logging.getLogger(__name__)
 
 MAX_RECURSION_DEPTH = sys.getrecursionlimit() - 1
 
-CLASSNAME = "__classname__"
-VERSION = "__version__"
-DATA = "__data__"
-SCHEMA_ID = "__id__"
-CACHE = "__cache__"
-
-OLD_TYPE = "__type"
-OLD_SOURCE = "__source"
-OLD_DATA = "__var"
-OLD_DICT = "dict"
 PYDANTIC_MODEL_QUALNAME = "pydantic.main.BaseModel"
 
 DEFAULT_VERSION = 0
diff --git a/uv.lock b/uv.lock
index b79d7562ac9..0bf0d038d1e 100644
--- a/uv.lock
+++ b/uv.lock
@@ -135,6 +135,7 @@ members = [
     "apache-airflow-shared-providers-discovery",
     "apache-airflow-shared-secrets-backend",
     "apache-airflow-shared-secrets-masker",
+    "apache-airflow-shared-serialization",
     "apache-airflow-shared-template-rendering",
     "apache-airflow-shared-timezones",
     "apache-airflow-task-sdk",
@@ -1314,6 +1315,7 @@ dev = [
     { name = "apache-airflow-shared-providers-discovery" },
     { name = "apache-airflow-shared-secrets-backend" },
     { name = "apache-airflow-shared-secrets-masker" },
+    { name = "apache-airflow-shared-serialization" },
     { name = "apache-airflow-shared-template-rendering" },
     { name = "apache-airflow-shared-timezones" },
     { name = "apache-airflow-task-sdk", extra = ["all"] },
@@ -1581,6 +1583,7 @@ dev = [
     { name = "apache-airflow-shared-providers-discovery", editable = 
"shared/providers_discovery" },
     { name = "apache-airflow-shared-secrets-backend", editable = 
"shared/secrets_backend" },
     { name = "apache-airflow-shared-secrets-masker", editable = 
"shared/secrets_masker" },
+    { name = "apache-airflow-shared-serialization", editable = 
"shared/serialization" },
     { name = "apache-airflow-shared-template-rendering", editable = 
"shared/template_rendering" },
     { name = "apache-airflow-shared-timezones", editable = "shared/timezones" 
},
     { name = "apache-airflow-task-sdk", extras = ["all"], editable = 
"task-sdk" },
@@ -7991,6 +7994,21 @@ requires-dist = [
 [package.metadata.requires-dev]
 dev = [{ name = "apache-airflow-devel-common", editable = "devel-common" }]
 
+[[package]]
+name = "apache-airflow-shared-serialization"
+version = "0.0"
+source = { editable = "shared/serialization" }
+
+[package.dev-dependencies]
+dev = [
+    { name = "apache-airflow-devel-common" },
+]
+
+[package.metadata]
+
+[package.metadata.requires-dev]
+dev = [{ name = "apache-airflow-devel-common", editable = "devel-common" }]
+
 [[package]]
 name = "apache-airflow-shared-template-rendering"
 version = "0.0"
@@ -19798,9 +19816,9 @@ name = "secretstorage"
 version = "3.5.0"
 source = { registry = "https://pypi.org/simple"; }
 dependencies = [
-    { name = "cryptography", version = "44.0.3", source = { registry = 
"https://pypi.org/simple"; }, marker = "python_full_version >= '3.12'" },
+    { name = "cryptography", version = "44.0.3", source = { registry = 
"https://pypi.org/simple"; }, marker = "(python_full_version >= '3.12' and 
python_full_version < '3.14' and sys_platform == 'emscripten') or 
(python_full_version >= '3.12' and python_full_version < '3.14' and 
sys_platform == 'win32') or (python_full_version >= '3.12' and sys_platform != 
'emscripten' and sys_platform != 'win32')" },
     { name = "cryptography", version = "46.0.6", source = { registry = 
"https://pypi.org/simple"; }, marker = "python_full_version < '3.12'" },
-    { name = "jeepney" },
+    { name = "jeepney", marker = "(python_full_version < '3.14' and 
sys_platform == 'emscripten') or (python_full_version < '3.14' and sys_platform 
== 'win32') or (sys_platform != 'emscripten' and sys_platform != 'win32')" },
 ]
 sdist = { url = 
"https://files.pythonhosted.org/packages/1c/03/e834bcd866f2f8a49a85eaff47340affa3bfa391ee9912a952a1faa68c7b/secretstorage-3.5.0.tar.gz";,
 hash = 
"sha256:f04b8e4689cbce351744d5537bf6b1329c6fc68f91fa666f60a380edddcd11be", size 
= 19884, upload-time = "2025-11-23T19:02:53.191Z" }
 wheels = [

Reply via email to