amoghrajesh opened a new issue, #47195: URL: https://github.com/apache/airflow/issues/47195
### Body As part of [AIP-72: Simplify the XCOM interface between task sdk and execution API #46719](https://github.com/apache/airflow/pull/46719), we simplifies the interactions between the task runner and the task execution API to interact in JSON supported objects and not serialised json strings. This leads into a side effect -- tuples arent handled well. The kind of error we run into (this is in the task execution api server) ``` │ │ │ /opt/airflow/airflow/api_fastapi/execution_api/routes/xcoms.py:228 in set_xcom │ │ │ │ 225 │ │ │ 226 │ # We use `BaseXCom.set` to set XComs directly to the database, bypassing the XCom Ba │ │ 227 │ try: │ │ ❱ 228 │ │ BaseXCom.set( │ │ 229 │ │ │ key=key, │ │ 230 │ │ │ value=value, │ │ 231 │ │ │ dag_id=dag_id, │ │ │ │ ╭─────────────────────────────────────── locals ───────────────────────────────────────╮ │ │ │ conf = <airflow.configuration.AirflowConfigParser object at 0xffff8a1c9580> │ │ │ │ dag_id = 'xcom_tuple_return' │ │ │ │ key = 'return_value' │ │ │ │ map_index = -1 │ │ │ │ mapped_length = None │ │ │ │ run_id = 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq' │ │ │ │ session = <sqlalchemy.orm.session.Session object at 0xffff59d667f0> │ │ │ │ task_id = 'tuple_task' │ │ │ │ token = TIToken(ti_key='test_key') │ │ │ │ value = { │ │ │ │ │ '__classname__': 'builtins.tuple', │ │ │ │ │ '__version__': 1, │ │ │ │ │ '__data__': ['Hello', 'XCom!'] │ │ │ │ } │ │ │ ╰──────────────────────────────────────────────────────────────────────────────────────╯ │ │ │ │ /opt/airflow/airflow/utils/session.py:98 in wrapper │ │ │ │ 95 │ @wraps(func) │ │ 96 │ def wrapper(*args, **kwargs) -> RT: │ │ 97 │ │ if "session" in kwargs or session_args_idx < len(args): │ │ ❱ 98 │ │ │ return func(*args, **kwargs) │ │ 99 │ │ else: │ │ 100 │ │ │ with create_session() as session: │ │ 101 │ │ │ │ return func(*args, session=session, **kwargs) │ │ │ │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ │ │ args = (<class 'airflow.models.xcom.BaseXCom'>,) │ │ │ │ kwargs = { │ │ │ │ │ 'key': 'return_value', │ │ │ │ │ 'value': { │ │ │ │ │ │ '__classname__': 'builtins.tuple', │ │ │ │ │ │ '__version__': 1, │ │ │ │ │ │ '__data__': ['Hello', 'XCom!'] │ │ │ │ │ }, │ │ │ │ │ 'dag_id': 'xcom_tuple_return', │ │ │ │ │ 'task_id': 'tuple_task', │ │ │ │ │ 'run_id': 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq', │ │ │ │ │ 'session': <sqlalchemy.orm.session.Session object at 0xffff59d667f0>, │ │ │ │ │ 'map_index': -1 │ │ │ │ } │ │ │ │ session_args_idx = 7 │ │ │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ │ │ │ /opt/airflow/airflow/models/xcom.py:185 in set │ │ │ │ 182 │ │ │ ) │ │ 183 │ │ │ value = list(value) │ │ 184 │ │ │ │ ❱ 185 │ │ value = cls.serialize_value( │ │ 186 │ │ │ value=value, │ │ 187 │ │ │ key=key, │ │ 188 │ │ │ task_id=task_id, │ │ │ │ ╭──────────────────────────────── locals ────────────────────────────────╮ │ │ │ dag_id = 'xcom_tuple_return' │ │ │ │ dag_run_id = 1 │ │ │ │ key = 'return_value' │ │ │ │ map_index = -1 │ │ │ │ run_id = 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq' │ │ │ │ session = <sqlalchemy.orm.session.Session object at 0xffff59d667f0> │ │ │ │ task_id = 'tuple_task' │ │ │ │ value = { │ │ │ │ │ '__classname__': 'builtins.tuple', │ │ │ │ │ '__version__': 1, │ │ │ │ │ '__data__': ['Hello', 'XCom!'] │ │ │ │ } │ │ │ ╰────────────────────────────────────────────────────────────────────────╯ │ │ │ │ /opt/airflow/airflow/models/xcom.py:452 in serialize_value │ │ │ │ 449 │ ) -> str: │ │ 450 │ │ """Serialize XCom value to JSON str.""" │ │ 451 │ │ try: │ │ ❱ 452 │ │ │ return json.dumps(value, cls=XComEncoder) │ │ 453 │ │ except (ValueError, TypeError): │ │ 454 │ │ │ raise ValueError("XCom value must be JSON serializable") │ │ 455 │ │ │ │ ╭──────────────────────────── locals ─────────────────────────────╮ │ │ │ dag_id = 'xcom_tuple_return' │ │ │ │ key = 'return_value' │ │ │ │ map_index = -1 │ │ │ │ run_id = 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq' │ │ │ │ task_id = 'tuple_task' │ │ │ │ value = { │ │ │ │ │ '__classname__': 'builtins.tuple', │ │ │ │ │ '__version__': 1, │ │ │ │ │ '__data__': ['Hello', 'XCom!'] │ │ │ │ } │ │ │ ╰─────────────────────────────────────────────────────────────────╯ │ │ │ │ /usr/local/lib/python3.9/json/__init__.py:234 in dumps │ │ │ │ 231 │ │ return _default_encoder.encode(obj) │ │ 232 │ if cls is None: │ │ 233 │ │ cls = JSONEncoder │ │ ❱ 234 │ return cls( │ │ 235 │ │ skipkeys=skipkeys, ensure_ascii=ensure_ascii, │ │ 236 │ │ check_circular=check_circular, allow_nan=allow_nan, indent=indent, │ │ 237 │ │ separators=separators, default=default, sort_keys=sort_keys, │ │ │ │ ╭──────────────────────── locals ─────────────────────────╮ │ │ │ allow_nan = True │ │ │ │ check_circular = True │ │ │ │ default = None │ │ │ │ ensure_ascii = True │ │ │ │ indent = None │ │ │ │ kw = {} │ │ │ │ obj = { │ │ │ │ │ '__classname__': 'builtins.tuple', │ │ │ │ │ '__version__': 1, │ │ │ │ │ '__data__': ['Hello', 'XCom!'] │ │ │ │ } │ │ │ │ separators = None │ │ │ │ skipkeys = False │ │ │ │ sort_keys = False │ │ │ ╰─────────────────────────────────────────────────────────╯ │ │ │ │ /opt/airflow/airflow/utils/json.py:99 in encode │ │ │ │ 96 │ def encode(self, o: Any) -> str: │ │ 97 │ │ # checked here and in serialize │ │ 98 │ │ if isinstance(o, dict) and (CLASSNAME in o or SCHEMA_ID in o): │ │ ❱ 99 │ │ │ raise AttributeError(f"reserved key {CLASSNAME} found in dict to serialize") │ │ 100 │ │ │ │ 101 │ │ # tuples are not preserved by std python serializer │ │ 102 │ │ if isinstance(o, tuple): │ │ │ │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ │ │ o = {'__classname__': 'builtins.tuple', '__version__': 1, '__data__': ['Hello', 'XCom!']} │ │ │ │ self = <airflow.utils.json.XComEncoder object at 0xffff599de6a0> │ │ │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯ AttributeError: reserved key __classname__ found in dict to serialize ``` To repro, use any simple dag that returns a tuple xcom. Example: ``` from airflow import DAG from airflow.providers.standard.operators.python import PythonOperator def push_to_xcom(**kwargs): value = ("Hello", "XCom!") return value with DAG( 'xcom_tuple_return', schedule=None, catchup=False, ) as dag: push_xcom_task = PythonOperator( task_id='tuple_task', python_callable=push_to_xcom, ) ``` ### Committer - [x] I acknowledge that I am a maintainer/committer of the Apache Airflow project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
