This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0ed72b480c5 Refactor xcom API to use shared serialisation constants
(#64148)
0ed72b480c5 is described below
commit 0ed72b480c572f5dab81084530b79fb89589dce2
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Mar 27 13:51:06 2026 +0530
Refactor xcom API to use shared serialisation constants (#64148)
---
airflow-core/pyproject.toml | 2 +
airflow-core/src/airflow/_shared/serialization | 1 +
.../api_fastapi/core_api/datamodels/xcom.py | 23 ++++++++++
.../core_api/routes/public/test_xcom.py | 19 ++++++++
pyproject.toml | 3 ++
shared/serialization/.gitignore | 1 +
shared/serialization/pyproject.toml | 53 ++++++++++++++++++++++
.../src/airflow_shared/serialization/__init__.py | 43 ++++++++++++++++++
shared/serialization/tests/conftest.py | 21 +++++++++
.../serialization/tests/serialization/__init__.py | 16 +++++++
.../tests/serialization/test_constants.py | 22 +++++++++
task-sdk/pyproject.toml | 2 +
task-sdk/src/airflow/sdk/_shared/serialization | 1 +
task-sdk/src/airflow/sdk/serde/__init__.py | 19 ++++----
uv.lock | 22 ++++++++-
15 files changed, 236 insertions(+), 12 deletions(-)
diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml
index d2e586e58bb..14e7bf266c4 100644
--- a/airflow-core/pyproject.toml
+++ b/airflow-core/pyproject.toml
@@ -248,6 +248,7 @@ exclude = [
"../shared/observability/src/airflow_shared/observability" =
"src/airflow/_shared/observability"
"../shared/secrets_backend/src/airflow_shared/secrets_backend" =
"src/airflow/_shared/secrets_backend"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" =
"src/airflow/_shared/secrets_masker"
+"../shared/serialization/src/airflow_shared/serialization" =
"src/airflow/_shared/serialization"
"../shared/timezones/src/airflow_shared/timezones" =
"src/airflow/_shared/timezones"
"../shared/listeners/src/airflow_shared/listeners" =
"src/airflow/_shared/listeners"
"../shared/plugins_manager/src/airflow_shared/plugins_manager" =
"src/airflow/_shared/plugins_manager"
@@ -330,6 +331,7 @@ shared_distributions = [
"apache-airflow-shared-observability",
"apache-airflow-shared-secrets-backend",
"apache-airflow-shared-secrets-masker",
+ "apache-airflow-shared-serialization",
"apache-airflow-shared-timezones",
"apache-airflow-shared-plugins-manager",
"apache-airflow-shared-providers-discovery",
diff --git a/airflow-core/src/airflow/_shared/serialization
b/airflow-core/src/airflow/_shared/serialization
new file mode 120000
index 00000000000..b730c8e6b47
--- /dev/null
+++ b/airflow-core/src/airflow/_shared/serialization
@@ -0,0 +1 @@
+../../../../shared/serialization/src/airflow_shared/serialization
\ No newline at end of file
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
index ad18de4be34..05cbb47c36c 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/xcom.py
@@ -90,6 +90,29 @@ class XComCreateBody(StrictBaseModel):
value: Any
map_index: int = -1
+ @field_validator("value")
+ @classmethod
+ def _check_forbidden_keys(cls, value: Any) -> Any:
+ """Recursively check for forbidden deserialization keys in
user-provided XCom data."""
+ from airflow._shared.serialization import FORBIDDEN_XCOM_KEYS
+
+ def _walk_forbidden_keys(obj: Any, path: str = "value") -> None:
+ if isinstance(obj, dict):
+ found = FORBIDDEN_XCOM_KEYS & obj.keys()
+ if found:
+ raise ValueError(
+ f"XCom {path} contains reserved serialization keys:
{', '.join(sorted(found))}. "
+ f"These keys are reserved for internal use."
+ )
+ for k, v in obj.items():
+ _walk_forbidden_keys(v, f"{path}.{k}")
+ elif isinstance(obj, (list, tuple)):
+ for i, item in enumerate(obj):
+ _walk_forbidden_keys(item, f"{path}[{i}]")
+
+ _walk_forbidden_keys(value)
+ return value
+
class XComUpdateBody(StrictBaseModel):
"""Payload serializer for updating an XCom entry."""
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
index b8d06eae613..c51469d944e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
@@ -718,6 +718,25 @@ class TestCreateXComEntry(TestXComEndpoint):
assert get_resp.json()["key"] == slash_key
assert get_resp.json()["value"] == json.dumps(TEST_XCOM_VALUE)
+ @pytest.mark.parametrize(
+ ("key", "value"),
+ [
+ ("__classname__", {"__classname__":
"airflow.sdk.definitions.connection.Connection"}),
+ ("__type", {"__type":
"airflow.sdk.definitions.connection.Connection", "__var": {}}),
+ ("__data__", {"nested": {"__data__": "malicious"}}),
+ ],
+ )
+ def test_create_xcom_entry_blocks_forbidden_keys(self, test_client, key,
value):
+ """Test that XCom creation blocks deserialization metadata keys."""
+ response = test_client.post(
+
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries",
+ json={"key": "test_key", "value": value, "map_index": -1},
+ )
+ assert response.status_code == 422
+ detail = str(response.json()["detail"])
+ assert "reserved serialization keys" in detail
+ assert key in detail
+
class TestDeleteXComEntry(TestXComEndpoint):
def test_delete_xcom_entry(self, test_client, session):
diff --git a/pyproject.toml b/pyproject.toml
index 4adfd24cb51..79938c84a0e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1309,6 +1309,7 @@ dev = [
"apache-airflow-shared-providers-discovery",
"apache-airflow-shared-secrets-backend",
"apache-airflow-shared-secrets-masker",
+ "apache-airflow-shared-serialization",
"apache-airflow-shared-template-rendering",
"apache-airflow-shared-timezones",
]
@@ -1371,6 +1372,7 @@ apache-airflow-shared-plugins-manager = { workspace =
true }
apache-airflow-shared-providers-discovery = { workspace = true }
apache-airflow-shared-secrets-backend = { workspace = true }
apache-airflow-shared-secrets-masker = { workspace = true }
+apache-airflow-shared-serialization = { workspace = true }
apache-airflow-shared-template-rendering = { workspace = true }
apache-airflow-shared-timezones = { workspace = true }
# Automatically generated provider workspace items
(update_airflow_pyproject_toml.py)
@@ -1503,6 +1505,7 @@ members = [
"shared/providers_discovery",
"shared/secrets_backend",
"shared/secrets_masker",
+ "shared/serialization",
"shared/template_rendering",
"shared/timezones",
# Automatically generated provider workspace members
(update_airflow_pyproject_toml.py)
diff --git a/shared/serialization/.gitignore b/shared/serialization/.gitignore
new file mode 100644
index 00000000000..bff2d762960
--- /dev/null
+++ b/shared/serialization/.gitignore
@@ -0,0 +1 @@
+*.iml
diff --git a/shared/serialization/pyproject.toml
b/shared/serialization/pyproject.toml
new file mode 100644
index 00000000000..343bb1c34b1
--- /dev/null
+++ b/shared/serialization/pyproject.toml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[project]
+name = "apache-airflow-shared-serialization"
+description = "Shared serialization constants for Airflow distributions"
+version = "0.0"
+classifiers = [
+ "Private :: Do Not Upload",
+]
+
+dependencies = []
+
+[dependency-groups]
+dev = [
+ "apache-airflow-devel-common",
+]
+
+[build-system]
+requires = [
+ "hatchling==1.29.0",
+ "packaging==26.0",
+ "pathspec==1.0.4",
+ "pluggy==1.6.0",
+ "tomli==2.4.0; python_version < '3.11'",
+ "trove-classifiers==2026.1.14.14",
+]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/airflow_shared"]
+
+[tool.ruff]
+extend = "../../pyproject.toml"
+src = ["src"]
+
+[tool.ruff.lint.per-file-ignores]
+# Ignore Doc rules et al for anything outside of tests
+"!src/*" = ["D", "S101", "TRY002"]
diff --git a/shared/serialization/src/airflow_shared/serialization/__init__.py
b/shared/serialization/src/airflow_shared/serialization/__init__.py
new file mode 100644
index 00000000000..a326d330ee3
--- /dev/null
+++ b/shared/serialization/src/airflow_shared/serialization/__init__.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+# Current serialization format keys
+CLASSNAME = "__classname__"
+VERSION = "__version__"
+DATA = "__data__"
+SCHEMA_ID = "__id__"
+CACHE = "__cache__"
+
+# Legacy serialization format keys (processed by serde for backwards
compatibility)
+OLD_TYPE = "__type"
+OLD_SOURCE = "__source"
+OLD_DATA = "__var"
+OLD_DICT = "dict"
+
+FORBIDDEN_XCOM_KEYS = frozenset(
+ {
+ CLASSNAME,
+ VERSION,
+ DATA,
+ SCHEMA_ID,
+ CACHE,
+ OLD_TYPE,
+ OLD_SOURCE,
+ OLD_DATA,
+ }
+)
diff --git a/shared/serialization/tests/conftest.py
b/shared/serialization/tests/conftest.py
new file mode 100644
index 00000000000..93aecf26184
--- /dev/null
+++ b/shared/serialization/tests/conftest.py
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+
+os.environ["_AIRFLOW__AS_LIBRARY"] = "true"
diff --git a/shared/serialization/tests/serialization/__init__.py
b/shared/serialization/tests/serialization/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/shared/serialization/tests/serialization/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/shared/serialization/tests/serialization/test_constants.py
b/shared/serialization/tests/serialization/test_constants.py
new file mode 100644
index 00000000000..f3a24e4af08
--- /dev/null
+++ b/shared/serialization/tests/serialization/test_constants.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+
+def test_placeholder():
+ """Placeholder test because this module doesn't have any logic, just
constants."""
+ pass
diff --git a/task-sdk/pyproject.toml b/task-sdk/pyproject.toml
index 627c506f258..df06f6118c7 100644
--- a/task-sdk/pyproject.toml
+++ b/task-sdk/pyproject.toml
@@ -139,6 +139,7 @@ path = "src/airflow/sdk/__init__.py"
"../shared/observability/src/airflow_shared/observability" =
"src/airflow/sdk/_shared/observability"
"../shared/secrets_backend/src/airflow_shared/secrets_backend" =
"src/airflow/sdk/_shared/secrets_backend"
"../shared/secrets_masker/src/airflow_shared/secrets_masker" =
"src/airflow/sdk/_shared/secrets_masker"
+"../shared/serialization/src/airflow_shared/serialization" =
"src/airflow/sdk/_shared/serialization"
"../shared/timezones/src/airflow_shared/timezones" =
"src/airflow/sdk/_shared/timezones"
"../shared/listeners/src/airflow_shared/listeners" =
"src/airflow/sdk/_shared/listeners"
"../shared/plugins_manager/src/airflow_shared/plugins_manager" =
"src/airflow/sdk/_shared/plugins_manager"
@@ -307,6 +308,7 @@ shared_distributions = [
"apache-airflow-shared-module-loading",
"apache-airflow-shared-secrets-backend",
"apache-airflow-shared-secrets-masker",
+ "apache-airflow-shared-serialization",
"apache-airflow-shared-timezones",
"apache-airflow-shared-observability",
"apache-airflow-shared-plugins-manager",
diff --git a/task-sdk/src/airflow/sdk/_shared/serialization
b/task-sdk/src/airflow/sdk/_shared/serialization
new file mode 120000
index 00000000000..61b7dfc4b60
--- /dev/null
+++ b/task-sdk/src/airflow/sdk/_shared/serialization
@@ -0,0 +1 @@
+../../../../../shared/serialization/src/airflow_shared/serialization
\ No newline at end of file
diff --git a/task-sdk/src/airflow/sdk/serde/__init__.py
b/task-sdk/src/airflow/sdk/serde/__init__.py
index c9536fd174c..9a216d2f11c 100644
--- a/task-sdk/src/airflow/sdk/serde/__init__.py
+++ b/task-sdk/src/airflow/sdk/serde/__init__.py
@@ -31,6 +31,15 @@ import attr
from airflow.sdk._shared.module_loading import import_string, iter_namespace,
qualname
from airflow.sdk._shared.observability.metrics.stats import Stats
+from airflow.sdk._shared.serialization import (
+ CLASSNAME,
+ DATA,
+ OLD_DATA,
+ OLD_DICT,
+ OLD_TYPE,
+ SCHEMA_ID,
+ VERSION,
+)
from airflow.sdk.configuration import conf
from airflow.sdk.observability.metrics import stats_utils
from airflow.sdk.serde.typing import is_pydantic_model
@@ -42,16 +51,6 @@ log = logging.getLogger(__name__)
MAX_RECURSION_DEPTH = sys.getrecursionlimit() - 1
-CLASSNAME = "__classname__"
-VERSION = "__version__"
-DATA = "__data__"
-SCHEMA_ID = "__id__"
-CACHE = "__cache__"
-
-OLD_TYPE = "__type"
-OLD_SOURCE = "__source"
-OLD_DATA = "__var"
-OLD_DICT = "dict"
PYDANTIC_MODEL_QUALNAME = "pydantic.main.BaseModel"
DEFAULT_VERSION = 0
diff --git a/uv.lock b/uv.lock
index b79d7562ac9..0bf0d038d1e 100644
--- a/uv.lock
+++ b/uv.lock
@@ -135,6 +135,7 @@ members = [
"apache-airflow-shared-providers-discovery",
"apache-airflow-shared-secrets-backend",
"apache-airflow-shared-secrets-masker",
+ "apache-airflow-shared-serialization",
"apache-airflow-shared-template-rendering",
"apache-airflow-shared-timezones",
"apache-airflow-task-sdk",
@@ -1314,6 +1315,7 @@ dev = [
{ name = "apache-airflow-shared-providers-discovery" },
{ name = "apache-airflow-shared-secrets-backend" },
{ name = "apache-airflow-shared-secrets-masker" },
+ { name = "apache-airflow-shared-serialization" },
{ name = "apache-airflow-shared-template-rendering" },
{ name = "apache-airflow-shared-timezones" },
{ name = "apache-airflow-task-sdk", extra = ["all"] },
@@ -1581,6 +1583,7 @@ dev = [
{ name = "apache-airflow-shared-providers-discovery", editable =
"shared/providers_discovery" },
{ name = "apache-airflow-shared-secrets-backend", editable =
"shared/secrets_backend" },
{ name = "apache-airflow-shared-secrets-masker", editable =
"shared/secrets_masker" },
+ { name = "apache-airflow-shared-serialization", editable =
"shared/serialization" },
{ name = "apache-airflow-shared-template-rendering", editable =
"shared/template_rendering" },
{ name = "apache-airflow-shared-timezones", editable = "shared/timezones"
},
{ name = "apache-airflow-task-sdk", extras = ["all"], editable =
"task-sdk" },
@@ -7991,6 +7994,21 @@ requires-dist = [
[package.metadata.requires-dev]
dev = [{ name = "apache-airflow-devel-common", editable = "devel-common" }]
+[[package]]
+name = "apache-airflow-shared-serialization"
+version = "0.0"
+source = { editable = "shared/serialization" }
+
+[package.dev-dependencies]
+dev = [
+ { name = "apache-airflow-devel-common" },
+]
+
+[package.metadata]
+
+[package.metadata.requires-dev]
+dev = [{ name = "apache-airflow-devel-common", editable = "devel-common" }]
+
[[package]]
name = "apache-airflow-shared-template-rendering"
version = "0.0"
@@ -19798,9 +19816,9 @@ name = "secretstorage"
version = "3.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
- { name = "cryptography", version = "44.0.3", source = { registry =
"https://pypi.org/simple" }, marker = "python_full_version >= '3.12'" },
+ { name = "cryptography", version = "44.0.3", source = { registry =
"https://pypi.org/simple" }, marker = "(python_full_version >= '3.12' and
python_full_version < '3.14' and sys_platform == 'emscripten') or
(python_full_version >= '3.12' and python_full_version < '3.14' and
sys_platform == 'win32') or (python_full_version >= '3.12' and sys_platform !=
'emscripten' and sys_platform != 'win32')" },
{ name = "cryptography", version = "46.0.6", source = { registry =
"https://pypi.org/simple" }, marker = "python_full_version < '3.12'" },
- { name = "jeepney" },
+ { name = "jeepney", marker = "(python_full_version < '3.14' and
sys_platform == 'emscripten') or (python_full_version < '3.14' and sys_platform
== 'win32') or (sys_platform != 'emscripten' and sys_platform != 'win32')" },
]
sdist = { url =
"https://files.pythonhosted.org/packages/1c/03/e834bcd866f2f8a49a85eaff47340affa3bfa391ee9912a952a1faa68c7b/secretstorage-3.5.0.tar.gz",
hash =
"sha256:f04b8e4689cbce351744d5537bf6b1329c6fc68f91fa666f60a380edddcd11be", size
= 19884, upload-time = "2025-11-23T19:02:53.191Z" }
wheels = [