This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d159feef80daa1c47125ff27fbc63184364e3da4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jul 11 21:26:31 2025 +0530
[v3-0-test] Fix reading huge (XCom) resposne in TaskSDK task process
(#53186) (#53194)
If you tried to send a large XCom value, it would fail in the task/child
process side with this error:
> RuntimeError: unable to read full response in child. (We read 36476, but
expected 1310046)
(The exact number that was able to read dependent on any different factors,
like the OS, the current state of the socket and other things. Sometimes it
would read up to 256kb fine, othertimes only 35kb as here)
This is because the kernel level read-side socket buffer is full, so that
was
as much as the Supervisor could send. The fix is to read in a loop until we
get it all.
(cherry picked from commit b9620bf4b77ec45b0e776aa56cab2b6ed7744dbe)
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
task-sdk/src/airflow/sdk/execution_time/comms.py | 19 +++++++-------
.../tests/task_sdk/execution_time/test_comms.py | 30 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 9 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index 97ed1761ad8..2c6dfea4e60 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -228,15 +228,16 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
length = int.from_bytes(len_bytes, byteorder="big")
buffer = bytearray(length)
- nread = self.socket.recv_into(buffer)
- if nread != length:
- raise RuntimeError(
- f"unable to read full response in child. (We read {nread}, but
expected {length})"
- )
- if nread == 0:
- raise EOFError(f"Request socket closed before response was
complete ({self.id_counter=})")
-
- resp = self.resp_decoder.decode(buffer)
+ mv = memoryview(buffer)
+
+ pos = 0
+ while pos < length:
+ nread = self.socket.recv_into(mv[pos:])
+ if nread == 0:
+ raise EOFError(f"Request socket closed before response was
complete ({self.id_counter=})")
+ pos += nread
+
+ resp = self.resp_decoder.decode(mv)
if maxfds:
return resp, fds or []
return resp
diff --git a/task-sdk/tests/task_sdk/execution_time/test_comms.py
b/task-sdk/tests/task_sdk/execution_time/test_comms.py
index 5adaa2562ab..48c7ad74a15 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_comms.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_comms.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+import threading
import uuid
from socket import socketpair
@@ -81,3 +82,32 @@ class TestCommsDecoder:
assert msg.dag_rel_path == "/dev/null"
assert msg.bundle_info == BundleInfo(name="any-name",
version="any-version")
assert msg.start_date == timezone.datetime(2024, 12, 1, 1)
+
+ def test_huge_payload(self):
+ r, w = socketpair()
+
+ msg = {
+ "type": "XComResult",
+ "key": "a",
+ "value": ("a" * 10 * 1024 * 1024) + "b", # A 10mb xcom value
+ }
+
+ w.settimeout(1.0)
+ bytes = msgspec.msgpack.encode(_ResponseFrame(0, msg, None))
+
+ # Since `sendall` blocks, we need to do the send in another thread, so
we can perform the read here
+ t = threading.Thread(target=w.sendall, args=(len(bytes).to_bytes(4,
byteorder="big") + bytes,))
+ t.start()
+
+ decoder = CommsDecoder(socket=r, log=None)
+
+ try:
+ msg = decoder._get_response()
+ finally:
+ t.join(2)
+
+ assert msg is not None
+
+ # It actually failed to read at all for large values, but lets just
make sure we get it all
+ assert len(msg.value) == 10 * 1024 * 1024 + 1
+ assert msg.value[-1] == "b"