amoghrajesh opened a new issue, #47195:
URL: https://github.com/apache/airflow/issues/47195

   ### Body
   
   As part of [AIP-72: Simplify the XCOM interface between task sdk and 
execution API #46719](https://github.com/apache/airflow/pull/46719), we 
simplifies the interactions between the task runner and the task execution API 
to interact in JSON supported objects and not serialised json strings. 
   
   This leads into a side effect -- tuples arent handled well.
   
   The kind of error we run into (this is in the task execution api server)
   ```
   │                                                                            
                      │
   │ /opt/airflow/airflow/api_fastapi/execution_api/routes/xcoms.py:228 in 
set_xcom                   │
   │                                                                            
                      │
   │   225 │                                                                    
                      │
   │   226 │   # We use `BaseXCom.set` to set XComs directly to the database, 
bypassing the XCom Ba   │
   │   227 │   try:                                                             
                      │
   │ ❱ 228 │   │   BaseXCom.set(                                                
                      │
   │   229 │   │   │   key=key,                                                 
                      │
   │   230 │   │   │   value=value,                                             
                      │
   │   231 │   │   │   dag_id=dag_id,                                           
                      │
   │                                                                            
                      │
   │ ╭─────────────────────────────────────── locals 
───────────────────────────────────────╮         │
   │ │          conf = <airflow.configuration.AirflowConfigParser object at 
0xffff8a1c9580> │         │
   │ │        dag_id = 'xcom_tuple_return'                                      
            │         │
   │ │           key = 'return_value'                                           
            │         │
   │ │     map_index = -1                                                       
            │         │
   │ │ mapped_length = None                                                     
            │         │
   │ │        run_id = 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq'      
            │         │
   │ │       session = <sqlalchemy.orm.session.Session object at 
0xffff59d667f0>            │         │
   │ │       task_id = 'tuple_task'                                             
            │         │
   │ │         token = TIToken(ti_key='test_key')                               
            │         │
   │ │         value = {                                                        
            │         │
   │ │                 │   '__classname__': 'builtins.tuple',                   
            │         │
   │ │                 │   '__version__': 1,                                    
            │         │
   │ │                 │   '__data__': ['Hello', 'XCom!']                       
            │         │
   │ │                 }                                                        
            │         │
   │ 
╰──────────────────────────────────────────────────────────────────────────────────────╯
         │
   │                                                                            
                      │
   │ /opt/airflow/airflow/utils/session.py:98 in wrapper                        
                      │
   │                                                                            
                      │
   │    95 │   @wraps(func)                                                     
                      │
   │    96 │   def wrapper(*args, **kwargs) -> RT:                              
                      │
   │    97 │   │   if "session" in kwargs or session_args_idx < len(args):      
                      │
   │ ❱  98 │   │   │   return func(*args, **kwargs)                             
                      │
   │    99 │   │   else:                                                        
                      │
   │   100 │   │   │   with create_session() as session:                        
                      │
   │   101 │   │   │   │   return func(*args, session=session, **kwargs)        
                      │
   │                                                                            
                      │
   │ ╭─────────────────────────────────────────── locals 
───────────────────────────────────────────╮ │
   │ │             args = (<class 'airflow.models.xcom.BaseXCom'>,)             
                    │ │
   │ │           kwargs = {                                                     
                    │ │
   │ │                    │   'key': 'return_value',                            
                    │ │
   │ │                    │   'value': {                                        
                    │ │
   │ │                    │   │   '__classname__': 'builtins.tuple',            
                    │ │
   │ │                    │   │   '__version__': 1,                             
                    │ │
   │ │                    │   │   '__data__': ['Hello', 'XCom!']                
                    │ │
   │ │                    │   },                                                
                    │ │
   │ │                    │   'dag_id': 'xcom_tuple_return',                    
                    │ │
   │ │                    │   'task_id': 'tuple_task',                          
                    │ │
   │ │                    │   'run_id': 
'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq',        │ │
   │ │                    │   'session': <sqlalchemy.orm.session.Session object 
at 0xffff59d667f0>, │ │
   │ │                    │   'map_index': -1                                   
                    │ │
   │ │                    }                                                     
                    │ │
   │ │ session_args_idx = 7                                                     
                    │ │
   │ 
╰──────────────────────────────────────────────────────────────────────────────────────────────╯
 │
   │                                                                            
                      │
   │ /opt/airflow/airflow/models/xcom.py:185 in set                             
                      │
   │                                                                            
                      │
   │   182 │   │   │   )                                                        
                      │
   │   183 │   │   │   value = list(value)                                      
                      │
   │   184 │   │                                                                
                      │
   │ ❱ 185 │   │   value = cls.serialize_value(                                 
                      │
   │   186 │   │   │   value=value,                                             
                      │
   │   187 │   │   │   key=key,                                                 
                      │
   │   188 │   │   │   task_id=task_id,                                         
                      │
   │                                                                            
                      │
   │ ╭──────────────────────────────── locals ────────────────────────────────╮ 
                      │
   │ │     dag_id = 'xcom_tuple_return'                                       │ 
                      │
   │ │ dag_run_id = 1                                                         │ 
                      │
   │ │        key = 'return_value'                                            │ 
                      │
   │ │  map_index = -1                                                        │ 
                      │
   │ │     run_id = 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq'       │ 
                      │
   │ │    session = <sqlalchemy.orm.session.Session object at 0xffff59d667f0> │ 
                      │
   │ │    task_id = 'tuple_task'                                              │ 
                      │
   │ │      value = {                                                         │ 
                      │
   │ │              │   '__classname__': 'builtins.tuple',                    │ 
                      │
   │ │              │   '__version__': 1,                                     │ 
                      │
   │ │              │   '__data__': ['Hello', 'XCom!']                        │ 
                      │
   │ │              }                                                         │ 
                      │
   │ ╰────────────────────────────────────────────────────────────────────────╯ 
                      │
   │                                                                            
                      │
   │ /opt/airflow/airflow/models/xcom.py:452 in serialize_value                 
                      │
   │                                                                            
                      │
   │   449 │   ) -> str:                                                        
                      │
   │   450 │   │   """Serialize XCom value to JSON str."""                      
                      │
   │   451 │   │   try:                                                         
                      │
   │ ❱ 452 │   │   │   return json.dumps(value, cls=XComEncoder)                
                      │
   │   453 │   │   except (ValueError, TypeError):                              
                      │
   │   454 │   │   │   raise ValueError("XCom value must be JSON serializable") 
                      │
   │   455                                                                      
                      │
   │                                                                            
                      │
   │ ╭──────────────────────────── locals ─────────────────────────────╮        
                      │
   │ │    dag_id = 'xcom_tuple_return'                                 │        
                      │
   │ │       key = 'return_value'                                      │        
                      │
   │ │ map_index = -1                                                  │        
                      │
   │ │    run_id = 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq' │        
                      │
   │ │   task_id = 'tuple_task'                                        │        
                      │
   │ │     value = {                                                   │        
                      │
   │ │             │   '__classname__': 'builtins.tuple',              │        
                      │
   │ │             │   '__version__': 1,                               │        
                      │
   │ │             │   '__data__': ['Hello', 'XCom!']                  │        
                      │
   │ │             }                                                   │        
                      │
   │ ╰─────────────────────────────────────────────────────────────────╯        
                      │
   │                                                                            
                      │
   │ /usr/local/lib/python3.9/json/__init__.py:234 in dumps                     
                      │
   │                                                                            
                      │
   │   231 │   │   return _default_encoder.encode(obj)                          
                      │
   │   232 │   if cls is None:                                                  
                      │
   │   233 │   │   cls = JSONEncoder                                            
                      │
   │ ❱ 234 │   return cls(                                                      
                      │
   │   235 │   │   skipkeys=skipkeys, ensure_ascii=ensure_ascii,                
                      │
   │   236 │   │   check_circular=check_circular, allow_nan=allow_nan, 
indent=indent,                 │
   │   237 │   │   separators=separators, default=default, sort_keys=sort_keys, 
                      │
   │                                                                            
                      │
   │ ╭──────────────────────── locals ─────────────────────────╮                
                      │
   │ │      allow_nan = True                                   │                
                      │
   │ │ check_circular = True                                   │                
                      │
   │ │        default = None                                   │                
                      │
   │ │   ensure_ascii = True                                   │                
                      │
   │ │         indent = None                                   │                
                      │
   │ │             kw = {}                                     │                
                      │
   │ │            obj = {                                      │                
                      │
   │ │                  │   '__classname__': 'builtins.tuple', │                
                      │
   │ │                  │   '__version__': 1,                  │                
                      │
   │ │                  │   '__data__': ['Hello', 'XCom!']     │                
                      │
   │ │                  }                                      │                
                      │
   │ │     separators = None                                   │                
                      │
   │ │       skipkeys = False                                  │                
                      │
   │ │      sort_keys = False                                  │                
                      │
   │ ╰─────────────────────────────────────────────────────────╯                
                      │
   │                                                                            
                      │
   │ /opt/airflow/airflow/utils/json.py:99 in encode                            
                      │
   │                                                                            
                      │
   │    96 │   def encode(self, o: Any) -> str:                                 
                      │
   │    97 │   │   # checked here and in serialize                              
                      │
   │    98 │   │   if isinstance(o, dict) and (CLASSNAME in o or SCHEMA_ID in 
o):                     │
   │ ❱  99 │   │   │   raise AttributeError(f"reserved key {CLASSNAME} found in 
dict to serialize")   │
   │   100 │   │                                                                
                      │
   │   101 │   │   # tuples are not preserved by std python serializer          
                      │
   │   102 │   │   if isinstance(o, tuple):                                     
                      │
   │                                                                            
                      │
   │ ╭─────────────────────────────────────────── locals 
───────────────────────────────────────────╮ │
   │ │    o = {'__classname__': 'builtins.tuple', '__version__': 1, '__data__': 
['Hello', 'XCom!']} │ │
   │ │ self = <airflow.utils.json.XComEncoder object at 0xffff599de6a0>         
                    │ │
   │ 
╰──────────────────────────────────────────────────────────────────────────────────────────────╯
 │
   
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
   AttributeError: reserved key __classname__ found in dict to serialize
   ```
   
   To repro, use any simple dag that returns a tuple xcom. Example:
   ```
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   
   def push_to_xcom(**kwargs):
       value = ("Hello", "XCom!")
       return value
   
   with DAG(
       'xcom_tuple_return',
       schedule=None,
       catchup=False,
   ) as dag:
   
       push_xcom_task = PythonOperator(
           task_id='tuple_task',
           python_callable=push_to_xcom,
       )
   ```
   
   
   
   ### Committer
   
   - [x] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to