amoghrajesh commented on code in PR #64598:
URL: https://github.com/apache/airflow/pull/64598#discussion_r3071458589
##########
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py:
##########
@@ -118,6 +118,34 @@ class ModifyDeferredTaskKwargsToJsonValue(VersionChange):
schema(TIDeferredStatePayload).field("next_kwargs").had(type=dict[str,
Any]),
)
+ @convert_response_to_previous_version_for(TIRunContext) # type:
ignore[arg-type]
+ def convert_next_kwargs_to_base_serialization(response: ResponseInfo) ->
None: # type: ignore[misc]
+ """
+ Convert next_kwargs from SDK serde format to BaseSerialization format
for old workers.
+
+ Old workers (task-sdk < 1.2) only know
BaseSerialization.deserialize(), which requires
+ dicts wrapped as {"__type": "dict", "__var": {...}}. SDK serde
produces plain dicts that
+ BaseSerialization cannot parse, causing KeyError on __var.
+
+ We must deserialize SDK serde first to recover native Python objects
(datetime,
+ timedelta, etc.), then re-serialize with BaseSerialization so old
workers get
+ proper typed values instead of raw {"__classname__": ...} dicts.
+ """
+ next_kwargs = response.body.get("next_kwargs")
+ if next_kwargs is None:
+ return
+
+ from airflow.sdk.serde import deserialize
+ from airflow.serialization.serialized_objects import BaseSerialization
+
+ try:
+ plain = deserialize(next_kwargs)
+ except (ImportError, KeyError, AttributeError, TypeError):
+ # Already in BaseSerialization format (rolling upgrade, old data
in DB)
+ return
+
+ response.body["next_kwargs"] = BaseSerialization.serialize(plain)
Review Comment:
Thank you, this makes sense and was a miss from me!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]