This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new edc13fdb5bb [SPARK-44215][3.3][SHUFFLE] If num chunks are 0, then
server should throw a RuntimeException
edc13fdb5bb is described below
commit edc13fdb5bb38b0c2d5672882a2e50f44ac46e5c
Author: Chandni Singh <[email protected]>
AuthorDate: Wed Jul 5 20:48:05 2023 -0500
[SPARK-44215][3.3][SHUFFLE] If num chunks are 0, then server should throw a
RuntimeException
### What changes were proposed in this pull request?
The executor expects `numChunks` to be > 0. If it is zero, then we see that
the executor fails with
```
23/06/20 19:07:37 ERROR task 2031.0 in stage 47.0 (TID 25018) Executor:
Exception in task 2031.0 in stage 47.0 (TID 25018)
java.lang.ArithmeticException: / by zero
at
org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
```
Because this is an `ArithmeticException`, the executor doesn't fallback.
It's not a `FetchFailure` either, so the stage is not retried and the
application fails.
### Why are the changes needed?
The executor should fallback to fetch original blocks and not fail because
this suggests that there is an issue with push-merged block.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Modified the existing UTs to validate that RuntimeException is thrown when
numChunks are 0.
Closes #41859 from otterc/SPARK-44215-branch-3.3.
Authored-by: Chandni Singh <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../apache/spark/network/shuffle/RemoteBlockPushResolver.java | 4 ++++
.../spark/network/shuffle/RemoteBlockPushResolverSuite.java | 10 ++++++----
2 files changed, 10 insertions(+), 4 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 62ab3402896..5397c618578 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -261,6 +261,10 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
int size = (int) indexFile.length();
// First entry is the zero offset
int numChunks = (size / Long.BYTES) - 1;
+ if (numChunks <= 0) {
+ throw new RuntimeException(String.format(
+ "Merged shuffle index file %s is empty", indexFile.getPath()));
+ }
File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId,
shuffleMergeId, reduceId);
if (!metaFile.exists()) {
throw new RuntimeException(String.format("Merged shuffle meta file %s
not found",
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 603b20c7dba..225b296fa9b 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -270,8 +270,9 @@ public class RemoteBlockPushResolverSuite {
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
- MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
- assertEquals("num-chunks", 0, blockMeta.getNumChunks());
+ RuntimeException e = assertThrows(RuntimeException.class,
+ () -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0));
+ assertTrue(e.getMessage().contains("is empty"));
}
@Test
@@ -284,8 +285,9 @@ public class RemoteBlockPushResolverSuite {
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
- MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
- assertEquals("num-chunks", 0, blockMeta.getNumChunks());
+ RuntimeException e = assertThrows(RuntimeException.class,
+ () -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0));
+ assertTrue(e.getMessage().contains("is empty"));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]