This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e26fa3fef27 [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics 
for Push-based shuffle
e26fa3fef27 is described below

commit e26fa3fef273975c00a4668721a95bea1ba7f770
Author: Minchu Yang <[email protected]>
AuthorDate: Thu Jan 12 00:53:16 2023 -0600

    [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based 
shuffle
    
    ### What changes were proposed in this pull request?
    This is one of the patches for SPARK-33235: Push-based Shuffle Improvement 
Tasks.
    Added a class `PushMergeMetrics`, to collect below metrics from shuffle 
server side for Push-based shuffle:
    - no opportunity responses
    - too late responses
    - pushed bytes written
    - deferred block bytes
    - number of deferred blocks
    - stale block push requests
    - ignored block bytes
    ### Why are the changes needed?
    This helps to understand the push based shuffle metrics from shuffle server 
side.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added a method `verifyMetrics` to verify those metrics in existing unit 
tests.
    
    Closes #37638 from rmcyang/SPARK-33573-1.
    
    Lead-authored-by: Minchu Yang <[email protected]>
    Co-authored-by: Minchu Yang <[email protected]>
    Signed-off-by: Mridul <mridul<at>gmail.com>
---
 .../network/shuffle/MergedShuffleFileManager.java  |  13 +++
 .../network/shuffle/RemoteBlockPushResolver.java   | 124 +++++++++++++++++++--
 .../shuffle/RemoteBlockPushResolverSuite.java      |  58 +++++++++-
 .../spark/network/yarn/YarnShuffleService.java     |   5 +
 docs/monitoring.md                                 |  15 +++
 5 files changed, 203 insertions(+), 12 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
index 051684a92d0..7176b30ba08 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
@@ -18,6 +18,9 @@
 package org.apache.spark.network.shuffle;
 
 import java.io.IOException;
+import java.util.Collections;
+
+import com.codahale.metrics.MetricSet;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.network.buffer.ManagedBuffer;
@@ -126,4 +129,14 @@ public interface MergedShuffleFileManager {
    * leveldb for state persistence.
    */
   default void close() {}
+
+  /**
+   * Get the metrics associated with the MergedShuffleFileManager. E.g., this 
is used to collect
+   * the push merged metrics within RemoteBlockPushResolver.
+   *
+   * @return the map contains the metrics
+   */
+  default MetricSet getMetrics() {
+    return Collections::emptyMap;
+  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index c3a2e9a883a..6a65e6ccfab 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -29,6 +29,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +49,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
@@ -133,6 +138,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   @SuppressWarnings("UnstableApiUsage")
   private final LoadingCache<String, ShuffleIndexInformation> indexCache;
 
+  private final PushMergeMetrics pushMergeMetrics;
+
   @VisibleForTesting
   final File recoveryFile;
 
@@ -171,6 +178,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         dbBackend, Constants.SHUFFLE_SERVICE_DB_BACKEND);
       reloadAndCleanUpAppShuffleInfo(db);
     }
+    this.pushMergeMetrics = new PushMergeMetrics();
   }
 
   @VisibleForTesting
@@ -504,6 +512,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
   }
 
+  public MetricSet getMetrics() {
+    return pushMergeMetrics;
+  }
+
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
@@ -579,6 +591,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       return new PushBlockStreamCallback(
         this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex);
     } else {
+      // The block would be considered as too late if it received after 
shuffle merge finalize,
+      // and hence mark it as a late block push to the pushMergeMetrics
+      pushMergeMetrics.lateBlockPushes.mark();
       final BlockPushNonFatalFailure finalFailure = failure;
       // For a duplicate block or a block which is late or stale block from an 
older
       // shuffleMergeId, respond back with a callback that handles them 
differently.
@@ -592,6 +607,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         public void onData(String streamId, ByteBuffer buf) {
           // Ignore the requests. It reaches here either when a request is 
received after the
           // shuffle file is finalized or when a request is for a duplicate 
block.
+          pushMergeMetrics.ignoredBlockBytes.mark(buf.limit());
         }
 
         @Override
@@ -1137,7 +1153,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         long updatedPos = partitionInfo.getDataFilePos() + length;
         logger.debug("{} current pos {} updated pos {}", partitionInfo,
           partitionInfo.getDataFilePos(), updatedPos);
-        length += partitionInfo.dataChannel.write(buf, updatedPos);
+        int bytesWritten = partitionInfo.dataChannel.write(buf, updatedPos);
+        length += bytesWritten;
+        mergeManager.pushMergeMetrics.blockBytesWritten.mark(bytesWritten);
       }
     }
 
@@ -1174,8 +1192,24 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
      * block parts buffered in memory.
      */
     private void writeDeferredBufs() throws IOException {
+      long totalSize = 0;
       for (ByteBuffer deferredBuf : deferredBufs) {
+        totalSize += deferredBuf.limit();
         writeBuf(deferredBuf);
+        mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
+      }
+      mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
+      deferredBufs = null;
+    }
+
+    private void freeDeferredBufs() {
+      if (deferredBufs != null && !deferredBufs.isEmpty()) {
+        long totalSize = 0;
+        for (ByteBuffer deferredBuf : deferredBufs) {
+          totalSize += deferredBuf.limit();
+          mergeManager.pushMergeMetrics.deferredBlocks.mark(-1);
+        }
+        mergeManager.pushMergeMetrics.deferredBlockBytes.dec(totalSize);
       }
       deferredBufs = null;
     }
@@ -1185,7 +1219,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
      */
     private void abortIfNecessary() {
       if 
(partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) {
-        deferredBufs = null;
+        freeDeferredBufs();
         throw new IllegalStateException(String.format("%s when merging %s",
           
ErrorHandler.BlockPushErrorHandler.IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX,
           streamId));
@@ -1245,9 +1279,16 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       synchronized (partitionInfo) {
         AppShuffleMergePartitionsInfo info =
             
appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
-        if (isStale(info, 
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||
-            isTooLate(info, partitionInfo.reduceId)) {
-          deferredBufs = null;
+        boolean isStaleBlockPush =
+            isStale(info, 
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId);
+        boolean isTooLateBlockPush = isTooLate(info, partitionInfo.reduceId);
+        if (isStaleBlockPush || isTooLateBlockPush) {
+          freeDeferredBufs();
+          if (isTooLateBlockPush) {
+            mergeManager.pushMergeMetrics.lateBlockPushes.mark();
+          } else {
+            mergeManager.pushMergeMetrics.staleBlockPushes.mark();
+          }
           return;
         }
         // Check whether we can write to disk
@@ -1255,7 +1296,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           // Identify duplicate block generated by speculative tasks. We 
respond success to
           // the client in cases of duplicate even though no data is written.
           if (isDuplicateBlock()) {
-            deferredBufs = null;
+            freeDeferredBufs();
             return;
           }
           abortIfNecessary();
@@ -1301,10 +1342,13 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           // Write the buffer to the in-memory deferred cache. Since buf is a 
slice of a larger
           // byte buffer, we cache only the relevant bytes not the entire 
large buffer to save
           // memory.
-          ByteBuffer deferredBuf = ByteBuffer.allocate(buf.remaining());
+          int deferredLen = buf.remaining();
+          ByteBuffer deferredBuf = ByteBuffer.allocate(deferredLen);
           deferredBuf.put(buf);
           deferredBuf.flip();
           deferredBufs.add(deferredBuf);
+          mergeManager.pushMergeMetrics.deferredBlockBytes.inc(deferredLen);
+          mergeManager.pushMergeMetrics.deferredBlocks.mark();
         }
       }
     }
@@ -1321,13 +1365,15 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         AppShuffleMergePartitionsInfo info =
             
appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isTooLate(info, partitionInfo.reduceId)) {
-          deferredBufs = null;
+          freeDeferredBufs();
+          mergeManager.pushMergeMetrics.lateBlockPushes.mark();
           throw new BlockPushNonFatalFailure(
             new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), 
streamId).toByteBuffer(),
             BlockPushNonFatalFailure.getErrorMsg(streamId, 
ReturnCode.TOO_LATE_BLOCK_PUSH));
         }
         if (isStale(info, 
partitionInfo.appAttemptShuffleMergeId.shuffleMergeId)) {
-          deferredBufs = null;
+          freeDeferredBufs();
+          mergeManager.pushMergeMetrics.staleBlockPushes.mark();
           throw new BlockPushNonFatalFailure(
             new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), 
streamId).toByteBuffer(),
             BlockPushNonFatalFailure.getErrorMsg(streamId, 
ReturnCode.STALE_BLOCK_PUSH));
@@ -1338,7 +1384,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           // Identify duplicate block generated by speculative tasks. We 
respond success to
           // the client in cases of duplicate even though no data is written.
           if (isDuplicateBlock()) {
-            deferredBufs = null;
+            freeDeferredBufs();
             return;
           }
           if (partitionInfo.getCurrentMapIndex() < 0) {
@@ -1378,7 +1424,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
             partitionInfo.resetChunkTracker();
           }
         } else {
-          deferredBufs = null;
+          freeDeferredBufs();
+          mergeManager.pushMergeMetrics.blockAppendCollisions.mark();
           throw new BlockPushNonFatalFailure(
             new 
BlockPushReturnCode(ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), streamId)
               .toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(
@@ -1954,4 +2001,59 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       return pos;
     }
   }
+
+  /**
+   * A class that wraps all the push-based shuffle service metrics.
+   */
+  static class PushMergeMetrics implements MetricSet {
+    // blockAppendCollisions tracks the number of shuffle push blocks collided 
in shuffle services
+    // as another block for the same reduce partition were being written
+    static final String BLOCK_APPEND_COLLISIONS_METRIC = 
"blockAppendCollisions";
+    // lateBlockPushes tracks the number of shuffle push blocks that are 
received in shuffle
+    // service after the specific shuffle merge has been finalized
+    static final String LATE_BLOCK_PUSHES_METRIC = "lateBlockPushes";
+    // blockBytesWritten tracks the size of the pushed block data written to 
file in bytes
+    static final String BLOCK_BYTES_WRITTEN_METRIC = "blockBytesWritten";
+    // deferredBlockBytes tracks the size of the current deferred block parts 
buffered in memory
+    static final String DEFERRED_BLOCK_BYTES_METRIC = "deferredBlockBytes";
+    // deferredBlocks tracks the number of the current deferred block parts 
buffered in memory
+    static final String DEFERRED_BLOCKS_METRIC = "deferredBlocks";
+    // staleBlockPushes tracks the number of stale shuffle block push requests
+    static final String STALE_BLOCK_PUSHES_METRIC = "staleBlockPushes";
+    // ignoredBlockBytes tracks the size of the blocks that are ignored after 
the shuffle file is
+    // finalized or when a request is for a duplicate block
+    static final String IGNORED_BLOCK_BYTES_METRIC = "ignoredBlockBytes";
+
+    private final Map<String, Metric> allMetrics;
+    private final Meter blockAppendCollisions;
+    private final Meter lateBlockPushes;
+    private final Meter blockBytesWritten;
+    private final Counter deferredBlockBytes;
+    private final Meter deferredBlocks;
+    private final Meter staleBlockPushes;
+    private final Meter ignoredBlockBytes;
+
+    private PushMergeMetrics() {
+      allMetrics = new HashMap<>();
+      blockAppendCollisions = new Meter();
+      allMetrics.put(BLOCK_APPEND_COLLISIONS_METRIC, blockAppendCollisions);
+      lateBlockPushes = new Meter();
+      allMetrics.put(LATE_BLOCK_PUSHES_METRIC, lateBlockPushes);
+      blockBytesWritten = new Meter();
+      allMetrics.put(BLOCK_BYTES_WRITTEN_METRIC, blockBytesWritten);
+      deferredBlockBytes = new Counter();
+      allMetrics.put(DEFERRED_BLOCK_BYTES_METRIC, deferredBlockBytes);
+      deferredBlocks = new Meter();
+      allMetrics.put(DEFERRED_BLOCKS_METRIC, deferredBlocks);
+      staleBlockPushes = new Meter();
+      allMetrics.put(STALE_BLOCK_PUSHES_METRIC, staleBlockPushes);
+      ignoredBlockBytes = new Meter();
+      allMetrics.put(IGNORED_BLOCK_BYTES_METRIC, ignoredBlockBytes);
+    }
+
+    @Override
+    public Map<String, Metric> getMetrics() {
+      return allMetrics;
+    }
+  }
 }
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 6a595ee346d..4bb2220d463 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -17,6 +17,9 @@
 
 package org.apache.spark.network.shuffle;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -52,6 +55,7 @@ import 
org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.client.StreamCallbackWithID;
 import org.apache.spark.network.server.BlockPushNonFatalFailure;
 import 
org.apache.spark.network.shuffle.RemoteBlockPushResolver.MergeShuffleFile;
+import 
org.apache.spark.network.shuffle.RemoteBlockPushResolver.PushMergeMetrics;
 import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
 import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
@@ -141,6 +145,7 @@ public class RemoteBlockPushResolverSuite {
     validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 5}, new 
int[][]{{0}, {1}});
+    verifyMetrics(9, 0, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -157,6 +162,7 @@ public class RemoteBlockPushResolverSuite {
     validateMergeStatuses(statuses, new int[] {0}, new long[] {13});
     MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
     validateChunks(TEST_APP, 0, 0, 0, meta, new int[]{5, 5, 3}, new 
int[][]{{0, 1}, {2}, {3}});
+    verifyMetrics(13, 0, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -186,6 +192,10 @@ public class RemoteBlockPushResolverSuite {
         new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 1, 0, 0));
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
+    verifyMetrics(2, 0, 0, 3, 1, 0, 0);
+    assertEquals("cached bytes", 3L,
+      ((Counter) pushResolver.getMetrics().getMetrics()
+        .get(PushMergeMetrics.DEFERRED_BLOCK_BYTES_METRIC)).getCount());
     // stream 1 now completes
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onComplete(stream1.getID());
@@ -195,6 +205,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 6}, new 
int[][]{{0}, {1}});
+    verifyMetrics(10, 0, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -209,14 +220,19 @@ public class RemoteBlockPushResolverSuite {
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
+    verifyMetrics(2, 0, 0, 6, 2, 0, 0);
+    assertEquals("cached bytes", 6L,
+      ((Counter) pushResolver.getMetrics().getMetrics()
+        .get(PushMergeMetrics.DEFERRED_BLOCK_BYTES_METRIC)).getCount());
     // stream 1 now completes
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onComplete(stream1.getID());
-    // stream 2 now completes completes
+    // stream 2 now completes
     stream2.onComplete(stream2.getID());
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 6}, new 
int[][]{{0}, {1}});
+    verifyMetrics(10, 0, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -237,6 +253,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
+    verifyMetrics(4, 0, 1, 0, 0, 0, 4);
   }
 
   @Test
@@ -259,6 +276,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
+    verifyMetrics(4, 0, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -271,6 +289,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
+    verifyMetrics(4, 0, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -285,6 +304,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
+    verifyMetrics(9, 0, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -300,6 +320,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new 
int[][]{{0}});
+    verifyMetrics(9, 0, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -327,6 +348,7 @@ public class RemoteBlockPushResolverSuite {
     assertEquals(errorCode.failureBlockId, stream1.getID());
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new 
int[][]{{0}});
+    verifyMetrics(9, 0, 1, 0, 0, 0, 4);
   }
 
   @Test
@@ -361,6 +383,7 @@ public class RemoteBlockPushResolverSuite {
     FileSegmentManagedBuffer mb =
       (FileSegmentManagedBuffer) pushResolver.getMergedBlockData(TEST_APP, 0, 
0, 0, 0);
     assertArrayEquals(expectedBytes, mb.nioByteBuffer().array());
+    verifyMetrics(14, 0, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -400,6 +423,9 @@ public class RemoteBlockPushResolverSuite {
         new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 2, 0, 0));
     // This should be deferred
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5]));
+    assertEquals("cached bytes", 5L,
+      ((Counter) pushResolver.getMetrics().getMetrics()
+        .get(PushMergeMetrics.DEFERRED_BLOCK_BYTES_METRIC)).getCount());
     // Since this stream didn't get any opportunity it will throw couldn't 
find opportunity error
     BlockPushNonFatalFailure e = assertThrows(BlockPushNonFatalFailure.class,
       () -> stream3.onComplete(stream3.getID()));
@@ -415,6 +441,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[] {4}, new int[][] 
{{0}});
+    verifyMetrics(4, 1, 0, 0, 0, 0, 0);
   }
 
   @Test
@@ -1047,6 +1074,7 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 2));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
2, 0);
     validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
+    verifyMetrics(6, 0, 0, 0, 0, 2, 0);
   }
 
   @Test
@@ -1081,6 +1109,7 @@ public class RemoteBlockPushResolverSuite {
 
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
2, 0);
     validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
+    verifyMetrics(6, 0, 0, 0, 0, 2, 0);
   }
 
   @Test
@@ -1434,6 +1463,33 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
+  private void verifyMetrics(
+      long expectedPushBytesWritten,
+      long expectedNoOpportunityResponses,
+      long expectedTooLateResponses,
+      long expectedDeferredBlocksBytes,
+      long expectedDeferredBlocks,
+      long expectedStaleBlockPushes,
+      long expectedIgnoredBlocksBytes) {
+    Map<String, Metric> metrics = pushResolver.getMetrics().getMetrics();
+    Meter writtenBytes = (Meter) 
metrics.get(PushMergeMetrics.BLOCK_BYTES_WRITTEN_METRIC);
+    assertEquals("bytes written", expectedPushBytesWritten, 
writtenBytes.getCount());
+    Meter collidedBlocks = (Meter) 
metrics.get(PushMergeMetrics.BLOCK_APPEND_COLLISIONS_METRIC);
+    assertEquals("could not find opportunity responses", 
expectedNoOpportunityResponses,
+      collidedBlocks.getCount());
+    Meter tooLateBlocks = (Meter) 
metrics.get(PushMergeMetrics.LATE_BLOCK_PUSHES_METRIC);
+    assertEquals("too late responses", expectedTooLateResponses, 
tooLateBlocks.getCount());
+    Counter cachedBytes = (Counter) 
metrics.get(PushMergeMetrics.DEFERRED_BLOCK_BYTES_METRIC);
+    assertEquals("cached block bytes", expectedDeferredBlocksBytes,
+      cachedBytes.getCount());
+    Meter deferredBlocks = (Meter) 
metrics.get(PushMergeMetrics.DEFERRED_BLOCKS_METRIC);
+    assertEquals("deferred blocks", expectedDeferredBlocks, 
deferredBlocks.getCount());
+    Meter staleBlockPushes = (Meter) 
metrics.get(PushMergeMetrics.STALE_BLOCK_PUSHES_METRIC);
+    assertEquals("stale block pushes", expectedStaleBlockPushes, 
staleBlockPushes.getCount());
+    Meter ignoredBlockBytes = (Meter) 
metrics.get(PushMergeMetrics.IGNORED_BLOCK_BYTES_METRIC);
+    assertEquals("ignored block bytes", expectedIgnoredBlocksBytes, 
ignoredBlockBytes.getCount());
+  }
+
   private static class PushBlock {
     private final int shuffleId;
     private final int shuffleMergeId;
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index bde358a638a..25adc1da32e 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -305,10 +305,15 @@ public class YarnShuffleService extends AuxiliaryService {
           DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME);
       YarnShuffleServiceMetrics serviceMetrics =
           new YarnShuffleServiceMetrics(metricsNamespace, 
blockHandler.getAllMetrics());
+      YarnShuffleServiceMetrics mergeManagerMetrics =
+          new YarnShuffleServiceMetrics("mergeManagerMetrics", 
shuffleMergeManager.getMetrics());
 
       MetricsSystemImpl metricsSystem = (MetricsSystemImpl) 
DefaultMetricsSystem.instance();
       metricsSystem.register(
           metricsNamespace, "Metrics on the Spark Shuffle Service", 
serviceMetrics);
+      metricsSystem.register(
+          "PushBasedShuffleMergeManager", "Metrics on the push-based shuffle 
merge manager",
+          mergeManagerMetrics);
       logger.info("Registered metrics with Hadoop's DefaultMetricsSystem using 
namespace '{}'",
           metricsNamespace);
 
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 804bde92060..dbc2c14aea6 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -1421,6 +1421,21 @@ Note: applies to the shuffle service
 - shuffle-server.usedDirectMemory
 - shuffle-server.usedHeapMemory
 
+
+- **note:** the metrics below apply when the server side configuration
+  `spark.shuffle.push.server.mergedShuffleFileManagerImpl` is set to
+  `org.apache.spark.network.shuffle.MergedShuffleFileManager` for Push-Based 
Shuffle
+- blockBytesWritten - the size of the pushed block data written to file in 
bytes
+- blockAppendCollisions - the number of shuffle push blocks collided in 
shuffle services
+  as another block for the same reduce partition were being written
+- lateBlockPushes - the number of shuffle push blocks that are received in 
shuffle service
+  after the specific shuffle merge has been finalized
+- deferredBlocks - the number of the current deferred block parts buffered in 
memory
+- deferredBlockBytes - the size of the current deferred block parts buffered 
in memory
+- staleBlockPushes - the number of stale shuffle block push requests
+- ignoredBlockBytes - the size of the pushed block data that are ignored after 
the shuffle
+  file is finalized or when a request is for a duplicate block
+
 # Advanced Instrumentation
 
 Several external tools can be used to help profile the performance of Spark 
jobs:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to