This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new edc13fdb5bb [SPARK-44215][3.3][SHUFFLE] If num chunks are 0, then 
server should throw a RuntimeException
edc13fdb5bb is described below

commit edc13fdb5bb38b0c2d5672882a2e50f44ac46e5c
Author: Chandni Singh <[email protected]>
AuthorDate: Wed Jul 5 20:48:05 2023 -0500

    [SPARK-44215][3.3][SHUFFLE] If num chunks are 0, then server should throw a 
RuntimeException
    
    ### What changes were proposed in this pull request?
    The executor expects `numChunks` to be > 0. If it is zero, then we see that 
the executor fails with
    ```
    23/06/20 19:07:37 ERROR task 2031.0 in stage 47.0 (TID 25018) Executor: 
Exception in task 2031.0 in stage 47.0 (TID 25018)
    java.lang.ArithmeticException: / by zero
            at 
org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
            at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
            at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
            at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
            at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
            at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    ```
    Because this is an `ArithmeticException`, the executor doesn't fallback. 
It's not a `FetchFailure` either, so the stage is not retried and the 
application fails.
    
    ### Why are the changes needed?
    The executor should fallback to fetch original blocks and not fail because 
this suggests that there is an issue with push-merged block.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Modified the existing UTs to validate that RuntimeException is thrown when 
numChunks are 0.
    
    Closes #41859 from otterc/SPARK-44215-branch-3.3.
    
    Authored-by: Chandni Singh <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../apache/spark/network/shuffle/RemoteBlockPushResolver.java  |  4 ++++
 .../spark/network/shuffle/RemoteBlockPushResolverSuite.java    | 10 ++++++----
 2 files changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 62ab3402896..5397c618578 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -261,6 +261,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     int size = (int) indexFile.length();
     // First entry is the zero offset
     int numChunks = (size / Long.BYTES) - 1;
+    if (numChunks <= 0) {
+      throw new RuntimeException(String.format(
+          "Merged shuffle index file %s is empty", indexFile.getPath()));
+    }
     File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, 
shuffleMergeId, reduceId);
     if (!metaFile.exists()) {
       throw new RuntimeException(String.format("Merged shuffle meta file %s 
not found",
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 603b20c7dba..225b296fa9b 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -270,8 +270,9 @@ public class RemoteBlockPushResolverSuite {
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
-    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
-    assertEquals("num-chunks", 0, blockMeta.getNumChunks());
+    RuntimeException e = assertThrows(RuntimeException.class,
+        () -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0));
+    assertTrue(e.getMessage().contains("is empty"));
   }
 
   @Test
@@ -284,8 +285,9 @@ public class RemoteBlockPushResolverSuite {
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
-    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
-    assertEquals("num-chunks", 0, blockMeta.getNumChunks());
+    RuntimeException e = assertThrows(RuntimeException.class,
+        () -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0));
+    assertTrue(e.getMessage().contains("is empty"));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to