This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 98c1da8cc11 Removing `airflow.utils.xcom` references in airflow-core
(#53144)
98c1da8cc11 is described below
commit 98c1da8cc110a624e0f60dc00aa1e33b0b86d69f
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Jul 11 12:40:23 2025 +0530
Removing `airflow.utils.xcom` references in airflow-core (#53144)
---
airflow-core/src/airflow/models/xcom.py | 9 +++------
airflow-core/src/airflow/utils/xcom.py | 1 +
airflow-core/tests/unit/models/test_taskinstance.py | 3 +--
airflow-core/tests/unit/models/test_trigger.py | 3 +--
airflow-core/tests/unit/models/test_xcom.py | 5 ++---
airflow-core/tests/unit/serialization/test_dag_serialization.py | 3 +--
.../tests/unit/ti_deps/deps/test_mapped_task_upstream_dep.py | 2 +-
7 files changed, 10 insertions(+), 16 deletions(-)
diff --git a/airflow-core/src/airflow/models/xcom.py
b/airflow-core/src/airflow/models/xcom.py
index 89bdbea17d3..badb86bd43d 100644
--- a/airflow-core/src/airflow/models/xcom.py
+++ b/airflow-core/src/airflow/models/xcom.py
@@ -47,12 +47,6 @@ from airflow.utils.json import XComDecoder, XComEncoder
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
-# XCom constants below are needed for providers backward compatibility,
-# which should import the constants directly after apache-airflow>=2.6.0
-from airflow.utils.xcom import (
- XCOM_RETURN_KEY,
-)
-
log = logging.getLogger(__name__)
if TYPE_CHECKING:
@@ -61,6 +55,9 @@ if TYPE_CHECKING:
from sqlalchemy.sql.expression import Select, TextClause
+XCOM_RETURN_KEY = "return_value"
+
+
class XComModel(TaskInstanceDependencies):
"""XCom model class. Contains table and some utilities."""
diff --git a/airflow-core/src/airflow/utils/xcom.py
b/airflow-core/src/airflow/utils/xcom.py
index fde313601fd..f65be31a834 100644
--- a/airflow-core/src/airflow/utils/xcom.py
+++ b/airflow-core/src/airflow/utils/xcom.py
@@ -20,4 +20,5 @@
# https://github.com/apache/airflow/pull/1618#discussion_r68249677
from __future__ import annotations
+# TODO: Remove this once all the providers have been moved to not use this
import
XCOM_RETURN_KEY = "return_value"
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index aecffe06e8e..d1b3a750db3 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -85,7 +85,6 @@ from airflow.utils.span_status import SpanStatus
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.task_group import TaskGroup
from airflow.utils.types import DagRunTriggeredByType, DagRunType
-from airflow.utils.xcom import XCOM_RETURN_KEY
from tests_common.test_utils import db
from tests_common.test_utils.db import clear_db_connections, clear_db_runs
@@ -1302,7 +1301,7 @@ class TestTaskInstance:
ti =
dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0]
ti.task = task
ti.run()
- assert ti.xcom_pull(task_ids=task_id, key=XCOM_RETURN_KEY) is None
+ assert ti.xcom_pull(task_ids=task_id) is None
def test_check_and_change_state_before_execution(self,
create_task_instance, testing_dag_bundle):
expected_external_executor_id = "banana"
diff --git a/airflow-core/tests/unit/models/test_trigger.py
b/airflow-core/tests/unit/models/test_trigger.py
index 83e7886b793..8fd489aca55 100644
--- a/airflow-core/tests/unit/models/test_trigger.py
+++ b/airflow-core/tests/unit/models/test_trigger.py
@@ -44,7 +44,6 @@ from airflow.triggers.base import (
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
-from airflow.utils.xcom import XCOM_RETURN_KEY
from tests_common.test_utils.config import conf_vars
@@ -243,7 +242,7 @@ def test_submit_event_task_end(mock_utcnow, session,
create_task_instance, event
# now, for each type, submit event
# verify that (1) task ends in right state and (2) xcom is pushed
Trigger.submit_event(
- trigger.id, event_cls(xcoms={XCOM_RETURN_KEY: "xcomret", "a": "b",
"c": "d"}), session=session
+ trigger.id, event_cls(xcoms={"return_value": "xcomret", "a": "b", "c":
"d"}), session=session
)
# commit changes made by submit event and expire all cache to read from db.
session.flush()
diff --git a/airflow-core/tests/unit/models/test_xcom.py
b/airflow-core/tests/unit/models/test_xcom.py
index 914271f9386..566d74ceaa7 100644
--- a/airflow-core/tests/unit/models/test_xcom.py
+++ b/airflow-core/tests/unit/models/test_xcom.py
@@ -37,7 +37,6 @@ from airflow.sdk.execution_time.xcom import
resolve_xcom_backend
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
-from airflow.utils.xcom import XCOM_RETURN_KEY
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker
@@ -173,7 +172,7 @@ class TestXCom:
XCom = resolve_xcom_backend()
XCom.set(
- key=XCOM_RETURN_KEY,
+ key=XCom.XCOM_RETURN_KEY,
value={"my_xcom_key": "my_xcom_value"},
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
@@ -181,7 +180,7 @@ class TestXCom:
map_index=-1,
)
serialize_watcher.assert_called_once_with(
- key=XCOM_RETURN_KEY,
+ key=XCom.XCOM_RETURN_KEY,
value={"my_xcom_key": "my_xcom_value"},
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index ad15025bc70..b6bd542741e 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -59,7 +59,7 @@ from airflow.models.connection import Connection
from airflow.models.dag import DAG
from airflow.models.dagbag import DagBag
from airflow.models.mappedoperator import MappedOperator
-from airflow.models.xcom import XComModel
+from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.bash import BashSensor
@@ -85,7 +85,6 @@ from airflow.utils import timezone
from airflow.utils.module_loading import qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import TaskGroup
-from airflow.utils.xcom import XCOM_RETURN_KEY
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.markers import
skip_if_force_lowest_dependencies_marker
diff --git
a/airflow-core/tests/unit/ti_deps/deps/test_mapped_task_upstream_dep.py
b/airflow-core/tests/unit/ti_deps/deps/test_mapped_task_upstream_dep.py
index ad8dfa081ad..65f85e0084d 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_mapped_task_upstream_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_mapped_task_upstream_dep.py
@@ -22,12 +22,12 @@ from typing import TYPE_CHECKING
import pytest
from airflow.exceptions import AirflowFailException, AirflowSkipException
+from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.deps.base_ti_dep import TIDepStatus
from airflow.ti_deps.deps.mapped_task_upstream_dep import MappedTaskUpstreamDep
from airflow.utils.state import TaskInstanceState
-from airflow.utils.xcom import XCOM_RETURN_KEY
pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]