Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1b90adc03 -> e3703c411


[SPARK-15259] Sort time metric should not include spill and record insertion 
time

## What changes were proposed in this pull request?

After SPARK-14669 it seems the sort time metric includes both spill and record 
insertion time. This makes it not very useful since the metric becomes close to 
the total execution time of the node.

We should track just the time spent for in-memory sort, as before.

## How was this patch tested?

Verified metric in the UI, also unit test on UnsafeExternalRowSorter.

cc davies

Author: Eric Liang <[email protected]>
Author: Eric Liang <[email protected]>

Closes #13035 from ericl/fix-metrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3703c41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3703c41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3703c41

Branch: refs/heads/branch-2.0
Commit: e3703c41177c01c41516d1669c1ffa239074e59f
Parents: 1b90adc
Author: Eric Liang <[email protected]>
Authored: Wed May 11 11:25:46 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Wed May 11 11:26:49 2016 -0700

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeExternalSorter.java       | 13 +++++++++++++
 .../unsafe/sort/UnsafeInMemorySorter.java       | 11 +++++++++++
 .../unsafe/sort/UnsafeExternalSorterSuite.java  | 20 ++++++++++++++++++++
 .../sql/execution/UnsafeExternalRowSorter.java  |  7 +++++++
 .../apache/spark/sql/execution/SortExec.scala   |  9 ++-------
 5 files changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e3703c41/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 8b6c96a..7dc0508 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -76,6 +76,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   private long pageCursor = -1;
   private long peakMemoryUsedBytes = 0;
   private long totalSpillBytes = 0L;
+  private long totalSortTimeNanos = 0L;
   private volatile SpillableIterator readingIterator = null;
 
   public static UnsafeExternalSorter createWithExistingInMemorySorter(
@@ -248,6 +249,17 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
   }
 
   /**
+   * @return the total amount of time spent sorting data (in-memory only).
+   */
+  public long getSortTimeNanos() {
+    UnsafeInMemorySorter sorter = inMemSorter;
+    if (sorter != null) {
+      return sorter.getSortTimeNanos();
+    }
+    return totalSortTimeNanos;
+  }
+
+  /**
    * Return the total number of bytes that has been spilled into disk so far.
    */
   public long getSpillSize() {
@@ -505,6 +517,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
         // in-memory sorter will not be used after spilling
         assert(inMemSorter != null);
         released += inMemSorter.getMemoryUsage();
+        totalSortTimeNanos += inMemSorter.getSortTimeNanos();
         inMemSorter.free();
         inMemSorter = null;
         taskContext.taskMetrics().incMemoryBytesSpilled(released);

http://git-wip-us.apache.org/repos/asf/spark/blob/e3703c41/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 03973f3..0cce792 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -97,6 +97,8 @@ public final class UnsafeInMemorySorter {
 
   private long initialSize;
 
+  private long totalSortTimeNanos = 0L;
+
   public UnsafeInMemorySorter(
     final MemoryConsumer consumer,
     final TaskMemoryManager memoryManager,
@@ -160,6 +162,13 @@ public final class UnsafeInMemorySorter {
     return pos / 2;
   }
 
+  /**
+   * @return the total amount of time spent sorting data (in-memory only).
+   */
+  public long getSortTimeNanos() {
+    return totalSortTimeNanos;
+  }
+
   public long getMemoryUsage() {
     return array.size() * 8;
   }
@@ -265,6 +274,7 @@ public final class UnsafeInMemorySorter {
    */
   public SortedIterator getSortedIterator() {
     int offset = 0;
+    long start = System.nanoTime();
     if (sorter != null) {
       if (this.radixSortSupport != null) {
         // TODO(ekl) we should handle NULL values before radix sort for 
efficiency, since they
@@ -275,6 +285,7 @@ public final class UnsafeInMemorySorter {
         sorter.sort(array, 0, pos / 2, sortComparator);
       }
     }
+    totalSortTimeNanos += System.nanoTime() - start;
     return new SortedIterator(pos / 2, offset);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e3703c41/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 60a40cc..2cae4be 100644
--- 
a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -49,6 +49,7 @@ import org.apache.spark.storage.*;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.util.Utils;
 
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.*;
 import static org.mockito.Answers.RETURNS_SMART_NULLS;
@@ -226,6 +227,25 @@ public class UnsafeExternalSorterSuite {
   }
 
   @Test
+  public void testSortTimeMetric() throws Exception {
+    final UnsafeExternalSorter sorter = newSorter();
+    long prevSortTime = sorter.getSortTimeNanos();
+    assertEquals(prevSortTime, 0);
+
+    sorter.insertRecord(null, 0, 0, 0);
+    sorter.spill();
+    assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
+    prevSortTime = sorter.getSortTimeNanos();
+
+    sorter.spill();  // no sort needed
+    assertEquals(sorter.getSortTimeNanos(), prevSortTime);
+
+    sorter.insertRecord(null, 0, 0, 0);
+    UnsafeSorterIterator iter = sorter.getSortedIterator();
+    assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
+  }
+
+  @Test
   public void spillingOccursInResponseToMemoryPressure() throws Exception {
     final UnsafeExternalSorter sorter = newSorter();
     // This should be enough records to completely fill up a data page:

http://git-wip-us.apache.org/repos/asf/spark/blob/e3703c41/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 8d9906d..37fbad4 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -108,6 +108,13 @@ public final class UnsafeExternalRowSorter {
     return sorter.getPeakMemoryUsedBytes();
   }
 
+  /**
+   * @return the total amount of time spent sorting data (in-memory only).
+   */
+  public long getSortTimeNanos() {
+    return sorter.getSortTimeNanos();
+  }
+
   private void cleanupResources() {
     sorter.cleanupResources();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e3703c41/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 0e4d6d7..66a16ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -97,11 +97,8 @@ case class SortExec(
       // Remember spill data size of this task before execute this operator so 
that we can
       // figure out how many bytes we spilled for this operator.
       val spillSizeBefore = metrics.memoryBytesSpilled
-      val beforeSort = System.nanoTime()
-
       val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
-
-      sortTime += (System.nanoTime() - beforeSort) / 1000000
+      sortTime += sorter.getSortTimeNanos / 1000000
       peakMemory += sorter.getPeakMemoryUsage
       spillSize += metrics.memoryBytesSpilled - spillSizeBefore
       metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
@@ -151,15 +148,13 @@ case class SortExec(
     val peakMemory = metricTerm(ctx, "peakMemory")
     val spillSize = metricTerm(ctx, "spillSize")
     val spillSizeBefore = ctx.freshName("spillSizeBefore")
-    val startTime = ctx.freshName("startTime")
     val sortTime = metricTerm(ctx, "sortTime")
     s"""
        | if ($needToSort) {
        |   long $spillSizeBefore = $metrics.memoryBytesSpilled();
-       |   long $startTime = System.nanoTime();
        |   $addToSorter();
        |   $sortedIterator = $sorterVariable.sort();
-       |   $sortTime.add((System.nanoTime() - $startTime) / 1000000);
+       |   $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000);
        |   $peakMemory.add($sorterVariable.getPeakMemoryUsage());
        |   $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
        |   
$metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to