This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 201a91bab0e [SPARK-42366][SHUFFLE] Log shuffle data corruption
diagnose cause
201a91bab0e is described below
commit 201a91bab0e3d540ee262bd73b597b137f3f987b
Author: sychen <[email protected]>
AuthorDate: Thu Feb 9 10:36:11 2023 -0600
[SPARK-42366][SHUFFLE] Log shuffle data corruption diagnose cause
### What changes were proposed in this pull request?
Output the cause in the `diagnoseCorruption` method.
### Why are the changes needed?
It is convenient to collect the reason of shuffle corruption from the
shuffle service Log deployed by YARN Nodemanager.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
exist UT
Closes #39918 from cxzl25/SPARK-42366.
Authored-by: sychen <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../shuffle/checksum/ShuffleChecksumHelper.java | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
index 4071088fe4b..6993be4c430 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
@@ -125,17 +125,18 @@ public class ShuffleChecksumHelper {
ManagedBuffer partitionData,
long checksumByReader) {
Cause cause;
+ long duration = -1L;
+ long checksumByWriter = -1L;
+ long checksumByReCalculation = -1L;
try {
long diagnoseStartNs = System.nanoTime();
// Try to get the checksum instance before reading the checksum file so
that
// `UnsupportedOperationException` can be thrown first before
`FileNotFoundException`
// when the checksum algorithm isn't supported.
Checksum checksumAlgo = getChecksumByAlgorithm(algorithm);
- long checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
- long checksumByReCalculation =
calculateChecksumForPartition(partitionData, checksumAlgo);
- long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
diagnoseStartNs);
- logger.info("Shuffle corruption diagnosis took {} ms, checksum file {}",
- duration, checksumFile.getAbsolutePath());
+ checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
+ checksumByReCalculation = calculateChecksumForPartition(partitionData,
checksumAlgo);
+ duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
diagnoseStartNs);
if (checksumByWriter != checksumByReCalculation) {
cause = Cause.DISK_ISSUE;
} else if (checksumByWriter != checksumByReader) {
@@ -153,6 +154,15 @@ public class ShuffleChecksumHelper {
logger.warn("Unable to diagnose shuffle block corruption", e);
cause = Cause.UNKNOWN_ISSUE;
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Shuffle corruption diagnosis took {} ms, checksum file {},
cause {}, " +
+ "checksumByReader {}, checksumByWriter {}, checksumByReCalculation {}",
+ duration, checksumFile.getAbsolutePath(), cause,
+ checksumByReader, checksumByWriter, checksumByReCalculation);
+ } else {
+ logger.info("Shuffle corruption diagnosis took {} ms, checksum file {},
cause {}",
+ duration, checksumFile.getAbsolutePath(), cause);
+ }
return cause;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]