This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 136afb17fe89 [SPARK-51667][SS][PYTHON] Disable Nagle's algorithm (via 
TCP_NODELAY = true) in TWS + PySpark for python <-> state server
136afb17fe89 is described below

commit 136afb17fe892fa4ca8277e69ea608304fec2599
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Tue Apr 1 06:53:32 2025 +0900

    [SPARK-51667][SS][PYTHON] Disable Nagle's algorithm (via TCP_NODELAY = 
true) in TWS + PySpark for python <-> state server
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to disable Nagle's algorithm (TCP_NODELAY = true) for the 
connection between Python worker and state server, in TWS + PySpark.
    
    ### Why are the changes needed?
    
    We have observed the consistent latency increment, which is almost slightly 
more than 40ms, from specific state interactions. e.g. ListState.put() / 
ListState.get() / ListState.appendList().
    
    The root cause is figured out as the bad combination of Nagle's algorithm 
and delayed ACK. The sequence is following:
    
    1. Python worker sends the proto message to JVM, and flushes the socket.
    2. Additionally, Python worker sends the follow-up data to JVM, and flushes 
the socket.
    3. JVM reads the proto message, and realizes there is follow-up data.
    4. JVM reads the follow-up data.
    5. JVM processes the request, and sends the response back to Python worker.
    
    Due to delayed ACK, even after 3, ACK is not sent back from JVM to Python 
worker. It is waiting for some data or multiple ACKs to be sent, but JVM is not 
going to send the data during that phase.
    
    Due to Nagle's algorithm, the message from 2 is not sent to JVM since there 
is no ACK for the message from 1. (There is in-flight unacknowledged message.)
    
    This deadlock situation is resolved after the timeout of delayed ACK, which 
is 40ms (minimum duration) in Linux. After the timeout, ACK is sent back from 
JVM to Python worker, hence Nagle's algorithm allows the message from 2 to be 
finally sent to JVM.
    
    The direction can be flipped depending on the command - the same thing can 
happen on the opposite direction of communication, JVM to Python worker.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually tested (via adding debug log to measure the time spent from the 
state interaction).
    
    Beyond that, this should pass the existing tests, which will be verified by 
CI.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50460 from HeartSaVioR/SPARK-51667.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit a760df7b84349974b9565df035b58ee92f82d9db)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../pyspark/sql/streaming/stateful_processor_api_client.py   | 12 ++++++++++++
 .../streaming/TransformWithStateInPandasStateServer.scala    | 12 ++++++++++++
 2 files changed, 24 insertions(+)

diff --git a/python/pyspark/sql/streaming/stateful_processor_api_client.py 
b/python/pyspark/sql/streaming/stateful_processor_api_client.py
index 6fd56481bc61..fa50ed00738c 100644
--- a/python/pyspark/sql/streaming/stateful_processor_api_client.py
+++ b/python/pyspark/sql/streaming/stateful_processor_api_client.py
@@ -54,6 +54,18 @@ class StatefulProcessorApiClient:
         self.key_schema = key_schema
         self._client_socket = socket.socket()
         self._client_socket.connect(("localhost", state_server_port))
+
+        # SPARK-51667: We have a pattern of sending messages continuously from 
one side
+        # (Python -> JVM, and vice versa) before getting response from other 
side. Since most
+        # messages we are sending are small, this triggers the bad combination 
of Nagle's algorithm
+        # and delayed ACKs, which can cause a significant delay on the latency.
+        # See SPARK-51667 for more details on how this can be a problem.
+        #
+        # Disabling either would work, but it's more common to disable Nagle's 
algorithm; there is
+        # lot less reference to disabling delayed ACKs, while there are lots 
of resources to
+        # disable Nagle's algorithm.
+        self._client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 
1)
+
         self.sockfile = self._client_socket.makefile(
             "rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
         )
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
index f665db8b5b12..f13df63c8f26 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala
@@ -138,6 +138,18 @@ class TransformWithStateInPandasStateServer(
 
   def run(): Unit = {
     val listeningSocket = stateServerSocket.accept()
+
+    // SPARK-51667: We have a pattern of sending messages continuously from 
one side
+    // (Python -> JVM, and vice versa) before getting response from other 
side. Since most
+    // messages we are sending are small, this triggers the bad combination of 
Nagle's algorithm
+    // and delayed ACKs, which can cause a significant delay on the latency.
+    // See SPARK-51667 for more details on how this can be a problem.
+    //
+    // Disabling either would work, but it's more common to disable Nagle's 
algorithm; there is
+    // lot less reference to disabling delayed ACKs, while there are lots of 
resources to
+    // disable Nagle's algorithm.
+    listeningSocket.setTcpNoDelay(true)
+
     inputStream = new DataInputStream(
       new BufferedInputStream(listeningSocket.getInputStream))
     outputStream = new DataOutputStream(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to