This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3e72806bb42 [SPARK-44215][SHUFFLE] If num chunks are 0, then server
should throw a RuntimeException
3e72806bb42 is described below
commit 3e72806bb421b103bf6e73518b80200ccdd58ce5
Author: Chandni Singh <[email protected]>
AuthorDate: Tue Jul 4 18:46:18 2023 -0500
[SPARK-44215][SHUFFLE] If num chunks are 0, then server should throw a
RuntimeException
### What changes were proposed in this pull request?
The executor expects `numChunks` to be > 0. If it is zero, then we see that
the executor fails with
```
23/06/20 19:07:37 ERROR task 2031.0 in stage 47.0 (TID 25018) Executor:
Exception in task 2031.0 in stage 47.0 (TID 25018)
java.lang.ArithmeticException: / by zero
at
org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
```
Because this is an `ArithmeticException`, the executor doesn't fallback.
It's not a `FetchFailure` either, so the stage is not retried and the
application fails.
### Why are the changes needed?
The executor should fallback to fetch original blocks and not fail because
this suggests that there is an issue with push-merged block.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Modified the existing UTs to validate that RuntimeException is thrown when
numChunks are 0.
Closes #41762 from otterc/SPARK-44215.
Authored-by: Chandni Singh <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../network/shuffle/RemoteBlockPushResolver.java | 4 ++++
.../shuffle/RemoteBlockPushResolverSuite.java | 24 ++++++++++++++--------
2 files changed, 20 insertions(+), 8 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 7f0862fcef4..b95a8700109 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -328,6 +328,10 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
int size = (int) indexFile.length();
// First entry is the zero offset
int numChunks = (size / Long.BYTES) - 1;
+ if (numChunks <= 0) {
+ throw new RuntimeException(String.format(
+ "Merged shuffle index file %s is empty", indexFile.getPath()));
+ }
File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId,
shuffleMergeId, reduceId);
if (!metaFile.exists()) {
throw new RuntimeException(String.format("Merged shuffle meta file %s
not found",
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 2526a94f429..0847121b0cc 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -281,7 +281,7 @@ public class RemoteBlockPushResolverSuite {
verifyMetrics(4, 0, 0, 0, 0, 0, 4);
}
- @Test
+ @Test(expected = RuntimeException.class)
public void testFailureAfterData() throws IOException {
StreamCallbackWithID stream =
pushResolver.receiveBlockDataAsStream(
@@ -289,12 +289,16 @@ public class RemoteBlockPushResolverSuite {
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
- MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
- assertEquals("num-chunks", 0, blockMeta.getNumChunks());
- verifyMetrics(4, 0, 0, 0, 0, 0, 4);
+ try {
+ pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
+ } catch (RuntimeException e) {
+ assertTrue(e.getMessage().contains("is empty"));
+ verifyMetrics(4, 0, 0, 0, 0, 0, 4);
+ throw e;
+ }
}
- @Test
+ @Test(expected = RuntimeException.class)
public void testFailureAfterMultipleDataBlocks() throws IOException {
StreamCallbackWithID stream =
pushResolver.receiveBlockDataAsStream(
@@ -304,9 +308,13 @@ public class RemoteBlockPushResolverSuite {
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
- MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
- assertEquals("num-chunks", 0, blockMeta.getNumChunks());
- verifyMetrics(9, 0, 0, 0, 0, 0, 9);
+ try {
+ pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
+ } catch (RuntimeException e) {
+ assertTrue(e.getMessage().contains("is empty"));
+ verifyMetrics(9, 0, 0, 0, 0, 0, 9);
+ throw e;
+ }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]