This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e26fa3fef27 [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics
for Push-based shuffle
e26fa3fef27 is described below
commit e26fa3fef273975c00a4668721a95bea1ba7f770
Author: Minchu Yang <[email protected]>
AuthorDate: Thu Jan 12 00:53:16 2023 -0600
[SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based
shuffle
### What changes were proposed in this pull request?
This is one of the patches for SPARK-33235: Push-based Shuffle Improvement
Tasks.
Added a class `PushMergeMetrics`, to collect below metrics from shuffle
server side for Push-based shuffle:
- no opportunity responses
- too late responses
- pushed bytes written
- deferred block bytes
- number of deferred blocks
- stale block push requests
- ignored block bytes
### Why are the changes needed?
This helps to understand the push based shuffle metrics from shuffle server
side.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a method `verifyMetrics` to verify those metrics in existing unit
tests.
Closes #37638 from rmcyang/SPARK-33573-1.
Lead-authored-by: Minchu Yang <[email protected]>
Co-authored-by: Minchu Yang <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
---
.../network/shuffle/MergedShuffleFileManager.java | 13 +++
.../network/shuffle/RemoteBlockPushResolver.java | 124 +++++++++++++++++++--
.../shuffle/RemoteBlockPushResolverSuite.java | 58 +++++++++-
.../spark/network/yarn/YarnShuffleService.java | 5 +
docs/monitoring.md | 15 +++
5 files changed, 203 insertions(+), 12 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
index 051684a92d0..7176b30ba08 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
@@ -18,6 +18,9 @@
package org.apache.spark.network.shuffle;
import java.io.IOException;
+import java.util.Collections;
+
+import com.codahale.metrics.MetricSet;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.network.buffer.ManagedBuffer;
@@ -126,4 +129,14 @@ public interface MergedShuffleFileManager {
* leveldb for state persistence.
*/
default void close() {}
+
+ /**
+ * Get the metrics associated with the MergedShuffleFileManager. E.g., this
is used to collect
+ * the push merged metrics within RemoteBlockPushResolver.
+ *
+ * @return the map contains the metrics
+ */
+ default MetricSet getMetrics() {
+ return Collections::emptyMap;
+ }
}
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index c3a2e9a883a..6a65e6ccfab 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -29,6 +29,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -48,6 +49,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
@@ -133,6 +138,8 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
@SuppressWarnings("UnstableApiUsage")
private final LoadingCache<String, ShuffleIndexInformation> indexCache;
+ private final PushMergeMetrics pushMergeMetrics;
+
@VisibleForTesting
final File recoveryFile;
@@ -171,6 +178,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
dbBackend, Constants.SHUFFLE_SERVICE_DB_BACKEND);
reloadAndCleanUpAppShuffleInfo(db);
}
+ this.pushMergeMetrics = new PushMergeMetrics();
}
@VisibleForTesting
@@ -504,6 +512,10 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
}
}
+ public MetricSet getMetrics() {
+ return pushMergeMetrics;
+ }
+
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
@@ -579,6 +591,9 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
return new PushBlockStreamCallback(
this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex);
} else {
+ // The block would be considered as too late if it received after
shuffle merge finalize,
+ // and hence mark it as a late block push to the pushMergeMetrics
+ pushMergeMetrics.lateBlockPushes.mark();
final BlockPushNonFatalFailure finalFailure = failure;
// For a duplicate block or a block which is late or stale block from an
older
// shuffleMergeId, respond back with a callback that handles them
differently.
@@ -592,6 +607,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
public void onData(String streamId, ByteBuffer buf) {
// Ignore the requests. It reaches here either when a request is
received after the
// shuffle file is finalized or when a request is for a duplicate
block.
+ pushMergeMetrics.ignoredBlockBytes.mark(buf.limit());
}
@Override
@@ -1137,7 +1153,9 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
long updatedPos = partitionInfo.getDataFilePos() + length;
logger.debug("{} current pos {} updated pos {}", partitionInfo,
partitionInfo.getDataFilePos(), updatedPos);
- length += partitionInfo.dataChannel.write(buf, updatedPos);
+ int bytesWritten = partitionInfo.dataChannel.write(buf, updatedPos);
+ length += bytesWritten;
+ mergeManager.pushMergeMetrics.blockBytesWritten.mark(bytesWritten);
}
}
@@ -1174,8 +1192,24 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
* block parts buffered in memory.
*/
private void writeDeferredBufs() throws IOException {
+ long totalSize = 0;
for (ByteBuffer deferredBuf : deferredBufs) {
+ totalSize += deferredBuf.limit();
writeBuf(deferredBuf);
+ mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
+ }
+ mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
+ deferredBufs = null;
+ }
+
+ private void freeDeferredBufs() {
+ if (deferredBufs != null && !deferredBufs.isEmpty()) {
+ long totalSize = 0;
+ for (ByteBuffer deferredBuf : deferredBufs) {
+ totalSize += deferredBuf.limit();
+ mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
+ }
+ mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
}
deferredBufs = null;
}
@@ -1185,7 +1219,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
*/
private void abortIfNecessary() {
if
(partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) {
- deferredBufs = null;
+ freeDeferredBufs();
throw new IllegalStateException(String.format("%s when merging %s",
ErrorHandler.BlockPushErrorHandler.IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX,
streamId));
@@ -1245,9 +1279,16 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
synchronized (partitionInfo) {
AppShuffleMergePartitionsInfo info =
appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
- if (isStale(info,
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||
- isTooLate(info, partitionInfo.reduceId)) {
- deferredBufs = null;
+ boolean isStaleBlockPush =
+ isStale(info,
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId);
+ boolean isTooLateBlockPush = isTooLate(info, partitionInfo.reduceId);
+ if (isStaleBlockPush || isTooLateBlockPush) {
+ freeDeferredBufs();
+ if (isTooLateBlockPush) {
+ mergeManager.pushMergeMetrics.lateBlockPushes.mark();
+ } else {
+ mergeManager.pushMergeMetrics.staleBlockPushes.mark();
+ }
return;
}
// Check whether we can write to disk
@@ -1255,7 +1296,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
// Identify duplicate block generated by speculative tasks. We
respond success to
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
- deferredBufs = null;
+ freeDeferredBufs();
return;
}
abortIfNecessary();
@@ -1301,10 +1342,13 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
// Write the buffer to the in-memory deferred cache. Since buf is a
slice of a larger
// byte buffer, we cache only the relevant bytes not the entire
large buffer to save
// memory.
- ByteBuffer deferredBuf = ByteBuffer.allocate(buf.remaining());
+ int deferredLen = buf.remaining();
+ ByteBuffer deferredBuf = ByteBuffer.allocate(deferredLen);
deferredBuf.put(buf);
deferredBuf.flip();
deferredBufs.add(deferredBuf);
+ mergeManager.pushMergeMetrics.deferredBlockBytes.inc(deferredLen);
+ mergeManager.pushMergeMetrics.deferredBlocks.mark();
}
}
}
@@ -1321,13 +1365,15 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
AppShuffleMergePartitionsInfo info =
appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
if (isTooLate(info, partitionInfo.reduceId)) {
- deferredBufs = null;
+ freeDeferredBufs();
+ mergeManager.pushMergeMetrics.lateBlockPushes.mark();
throw new BlockPushNonFatalFailure(
new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
streamId).toByteBuffer(),
BlockPushNonFatalFailure.getErrorMsg(streamId,
ReturnCode.TOO_LATE_BLOCK_PUSH));
}
if (isStale(info,
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId)) {
- deferredBufs = null;
+ freeDeferredBufs();
+ mergeManager.pushMergeMetrics.staleBlockPushes.mark();
throw new BlockPushNonFatalFailure(
new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(),
streamId).toByteBuffer(),
BlockPushNonFatalFailure.getErrorMsg(streamId,
ReturnCode.STALE_BLOCK_PUSH));
@@ -1338,7 +1384,7 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
// Identify duplicate block generated by speculative tasks. We
respond success to
// the client in cases of duplicate even though no data is written.
if (isDuplicateBlock()) {
- deferredBufs = null;
+ freeDeferredBufs();
return;
}
if (partitionInfo.getCurrentMapIndex() < 0) {
@@ -1378,7 +1424,8 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
partitionInfo.resetChunkTracker();
}
} else {
- deferredBufs = null;
+ freeDeferredBufs();
+ mergeManager.pushMergeMetrics.blockAppendCollisions.mark();
throw new BlockPushNonFatalFailure(
new
BlockPushReturnCode(ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), streamId)
.toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(
@@ -1954,4 +2001,59 @@ public class RemoteBlockPushResolver implements
MergedShuffleFileManager {
return pos;
}
}
+
+ /**
+ * A class that wraps all the push-based shuffle service metrics.
+ */
+ static class PushMergeMetrics implements MetricSet {
+ // blockAppendCollisions tracks the number of shuffle push blocks collided
in shuffle services
+ // as another block for the same reduce partition were being written
+ static final String BLOCK_APPEND_COLLISIONS_METRIC =
"blockAppendCollisions";
+ // lateBlockPushes tracks the number of shuffle push blocks that are
received in shuffle
+ // service after the specific shuffle merge has been finalized
+ static final String LATE_BLOCK_PUSHES_METRIC = "lateBlockPushes";
+ // blockBytesWritten tracks the size of the pushed block data written to
file in bytes
+ static final String BLOCK_BYTES_WRITTEN_METRIC = "blockBytesWritten";
+ // deferredBlockBytes tracks the size of the current deferred block parts
buffered in memory
+ static final String DEFERRED_BLOCK_BYTES_METRIC = "deferredBlockBytes";
+ // deferredBlocks tracks the number of the current deferred block parts
buffered in memory
+ static final String DEFERRED_BLOCKS_METRIC = "deferredBlocks";
+ // staleBlockPushes tracks the number of stale shuffle block push requests
+ static final String STALE_BLOCK_PUSHES_METRIC = "staleBlockPushes";
+ // ignoredBlockBytes tracks the size of the blocks that are ignored after
the shuffle file is
+ // finalized or when a request is for a duplicate block
+ static final String IGNORED_BLOCK_BYTES_METRIC = "ignoredBlockBytes";
+
+ private final Map<String, Metric> allMetrics;
+ private final Meter blockAppendCollisions;
+ private final Meter lateBlockPushes;
+ private final Meter blockBytesWritten;
+ private final Counter deferredBlockBytes;
+ private final Meter deferredBlocks;
+ private final Meter staleBlockPushes;
+ private final Meter ignoredBlockBytes;
+
+ private PushMergeMetrics() {
+ allMetrics = new HashMap<>();
+ blockAppendCollisions = new Meter();
+ allMetrics.put(BLOCK_APPEND_COLLISIONS_METRIC, blockAppendCollisions);
+ lateBlockPushes = new Meter();
+ allMetrics.put(LATE_BLOCK_PUSHES_METRIC, lateBlockPushes);
+ blockBytesWritten = new Meter();
+ allMetrics.put(BLOCK_BYTES_WRITTEN_METRIC, blockBytesWritten);
+ deferredBlockBytes = new Counter();
+ allMetrics.put(DEFERRED_BLOCK_BYTES_METRIC, deferredBlockBytes);
+ deferredBlocks = new Meter();
+ allMetrics.put(DEFERRED_BLOCKS_METRIC, deferredBlocks);
+ staleBlockPushes = new Meter();
+ allMetrics.put(STALE_BLOCK_PUSHES_METRIC, staleBlockPushes);
+ ignoredBlockBytes = new Meter();
+ allMetrics.put(IGNORED_BLOCK_BYTES_METRIC, ignoredBlockBytes);
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ return allMetrics;
+ }
+ }
}
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 6a595ee346d..4bb2220d463 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -17,6 +17,9 @@
package org.apache.spark.network.shuffle;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -52,6 +55,7 @@ import
org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.server.BlockPushNonFatalFailure;
import
org.apache.spark.network.shuffle.RemoteBlockPushResolver.MergeShuffleFile;
+import
org.apache.spark.network.shuffle.RemoteBlockPushResolver.PushMergeMetrics;
import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
@@ -141,6 +145,7 @@ public class RemoteBlockPushResolverSuite {
validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 5}, new
int[][]{{0}, {1}});
+ verifyMetrics(9, 0, 0, 0, 0, 0, 0);
}
@Test
@@ -157,6 +162,7 @@ public class RemoteBlockPushResolverSuite {
validateMergeStatuses(statuses, new int[] {0}, new long[] {13});
MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
validateChunks(TEST_APP, 0, 0, 0, meta, new int[]{5, 5, 3}, new
int[][]{{0, 1}, {2}, {3}});
+ verifyMetrics(13, 0, 0, 0, 0, 0, 0);
}
@Test
@@ -186,6 +192,10 @@ public class RemoteBlockPushResolverSuite {
new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 1, 0, 0));
// This should be deferred
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
+ verifyMetrics(2, 0, 0, 3, 1, 0, 0);
+ assertEquals("cached bytes", 3L,
+ ((Counter) pushResolver.getMetrics().getMetrics()
+ .get(PushMergeMetrics.DEFERRED_BLOCK_BYTES_METRIC)).getCount());
// stream 1 now completes
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onComplete(stream1.getID());
@@ -195,6 +205,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 6}, new
int[][]{{0}, {1}});
+ verifyMetrics(10, 0, 0, 0, 0, 0, 0);
}
@Test
@@ -209,14 +220,19 @@ public class RemoteBlockPushResolverSuite {
// This should be deferred
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
+ verifyMetrics(2, 0, 0, 6, 2, 0, 0);
+ assertEquals("cached bytes", 6L,
+ ((Counter) pushResolver.getMetrics().getMetrics()
+ .get(PushMergeMetrics.DEFERRED_BLOCK_BYTES_METRIC)).getCount());
// stream 1 now completes
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
stream1.onComplete(stream1.getID());
- // stream 2 now completes completes
+ // stream 2 now completes
stream2.onComplete(stream2.getID());
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 6}, new
int[][]{{0}, {1}});
+ verifyMetrics(10, 0, 0, 0, 0, 0, 0);
}
@Test
@@ -237,6 +253,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
+ verifyMetrics(4, 0, 1, 0, 0, 0, 4);
}
@Test
@@ -259,6 +276,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
+ verifyMetrics(4, 0, 0, 0, 0, 0, 0);
}
@Test
@@ -271,6 +289,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
+ verifyMetrics(4, 0, 0, 0, 0, 0, 0);
}
@Test
@@ -285,6 +304,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
+ verifyMetrics(9, 0, 0, 0, 0, 0, 0);
}
@Test
@@ -300,6 +320,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new
int[][]{{0}});
+ verifyMetrics(9, 0, 0, 0, 0, 0, 0);
}
@Test
@@ -327,6 +348,7 @@ public class RemoteBlockPushResolverSuite {
assertEquals(errorCode.failureBlockId, stream1.getID());
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new
int[][]{{0}});
+ verifyMetrics(9, 0, 1, 0, 0, 0, 4);
}
@Test
@@ -361,6 +383,7 @@ public class RemoteBlockPushResolverSuite {
FileSegmentManagedBuffer mb =
(FileSegmentManagedBuffer) pushResolver.getMergedBlockData(TEST_APP, 0,
0, 0, 0);
assertArrayEquals(expectedBytes, mb.nioByteBuffer().array());
+ verifyMetrics(14, 0, 0, 0, 0, 0, 0);
}
@Test
@@ -400,6 +423,9 @@ public class RemoteBlockPushResolverSuite {
new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 2, 0, 0));
// This should be deferred
stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5]));
+ assertEquals("cached bytes", 5L,
+ ((Counter) pushResolver.getMetrics().getMetrics()
+ .get(PushMergeMetrics.DEFERRED_BLOCK_BYTES_METRIC)).getCount());
// Since this stream didn't get any opportunity it will throw couldn't
find opportunity error
BlockPushNonFatalFailure e = assertThrows(BlockPushNonFatalFailure.class,
() -> stream3.onComplete(stream3.getID()));
@@ -415,6 +441,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[] {4}, new int[][]
{{0}});
+ verifyMetrics(4, 1, 0, 0, 0, 0, 0);
}
@Test
@@ -1047,6 +1074,7 @@ public class RemoteBlockPushResolverSuite {
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 2));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
2, 0);
validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
+ verifyMetrics(6, 0, 0, 0, 0, 2, 0);
}
@Test
@@ -1081,6 +1109,7 @@ public class RemoteBlockPushResolverSuite {
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
2, 0);
validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
+ verifyMetrics(6, 0, 0, 0, 0, 2, 0);
}
@Test
@@ -1434,6 +1463,33 @@ public class RemoteBlockPushResolverSuite {
}
}
+ private void verifyMetrics(
+ long expectedPushBytesWritten,
+ long expectedNoOpportunityResponses,
+ long expectedTooLateResponses,
+ long expectedDeferredBlocksBytes,
+ long expectedDeferredBlocks,
+ long expectedStaleBlockPushes,
+ long expectedIgnoredBlocksBytes) {
+ Map<String, Metric> metrics = pushResolver.getMetrics().getMetrics();
+ Meter writtenBytes = (Meter)
metrics.get(PushMergeMetrics.BLOCK_BYTES_WRITTEN_METRIC);
+ assertEquals("bytes written", expectedPushBytesWritten,
writtenBytes.getCount());
+ Meter collidedBlocks = (Meter)
metrics.get(PushMergeMetrics.BLOCK_APPEND_COLLISIONS_METRIC);
+ assertEquals("could not find opportunity responses",
expectedNoOpportunityResponses,
+ collidedBlocks.getCount());
+ Meter tooLateBlocks = (Meter)
metrics.get(PushMergeMetrics.LATE_BLOCK_PUSHES_METRIC);
+ assertEquals("too late responses", expectedTooLateResponses,
tooLateBlocks.getCount());
+ Counter cachedBytes = (Counter)
metrics.get(PushMergeMetrics.DEFERRED_BLOCK_BYTES_METRIC);
+ assertEquals("cached block bytes", expectedDeferredBlocksBytes,
+ cachedBytes.getCount());
+ Meter deferredBlocks = (Meter)
metrics.get(PushMergeMetrics.DEFERRED_BLOCKS_METRIC);
+ assertEquals("deferred blocks", expectedDeferredBlocks,
deferredBlocks.getCount());
+ Meter staleBlockPushes = (Meter)
metrics.get(PushMergeMetrics.STALE_BLOCK_PUSHES_METRIC);
+ assertEquals("stale block pushes", expectedStaleBlockPushes,
staleBlockPushes.getCount());
+ Meter ignoredBlockBytes = (Meter)
metrics.get(PushMergeMetrics.IGNORED_BLOCK_BYTES_METRIC);
+ assertEquals("ignored block bytes", expectedIgnoredBlocksBytes,
ignoredBlockBytes.getCount());
+ }
+
private static class PushBlock {
private final int shuffleId;
private final int shuffleMergeId;
diff --git
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index bde358a638a..25adc1da32e 100644
---
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -305,10 +305,15 @@ public class YarnShuffleService extends AuxiliaryService {
DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME);
YarnShuffleServiceMetrics serviceMetrics =
new YarnShuffleServiceMetrics(metricsNamespace,
blockHandler.getAllMetrics());
+ YarnShuffleServiceMetrics mergeManagerMetrics =
+ new YarnShuffleServiceMetrics("mergeManagerMetrics",
shuffleMergeManager.getMetrics());
MetricsSystemImpl metricsSystem = (MetricsSystemImpl)
DefaultMetricsSystem.instance();
metricsSystem.register(
metricsNamespace, "Metrics on the Spark Shuffle Service",
serviceMetrics);
+ metricsSystem.register(
+ "PushBasedShuffleMergeManager", "Metrics on the push-based shuffle
merge manager",
+ mergeManagerMetrics);
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem using
namespace '{}'",
metricsNamespace);
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 804bde92060..dbc2c14aea6 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -1421,6 +1421,21 @@ Note: applies to the shuffle service
- shuffle-server.usedDirectMemory
- shuffle-server.usedHeapMemory
+
+- **note:** the metrics below apply when the server side configuration
+ `spark.shuffle.push.server.mergedShuffleFileManagerImpl` is set to
+ `org.apache.spark.network.shuffle.MergedShuffleFileManager` for Push-Based
Shuffle
+- blockBytesWritten - the size of the pushed block data written to file in
bytes
+- blockAppendCollisions - the number of shuffle push blocks collided in
shuffle services
+ as another block for the same reduce partition were being written
+- lateBlockPushes - the number of shuffle push blocks that are received in
shuffle service
+ after the specific shuffle merge has been finalized
+- deferredBlocks - the number of the current deferred block parts buffered in
memory
+- deferredBlockBytes - the size of the current deferred block parts buffered
in memory
+- staleBlockPushes - the number of stale shuffle block push requests
+- ignoredBlockBytes - the size of the pushed block data that are ignored after
the shuffle
+ file is finalized or when a request is for a duplicate block
+
# Advanced Instrumentation
Several external tools can be used to help profile the performance of Spark
jobs:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]