Repository: spark
Updated Branches:
refs/heads/branch-1.5 b3f1e6533 -> 49355d0e0
[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array
When `TungstenAggregation` hits memory pressure, it switches from hash-based to
sort-based aggregation in-place. However, in the process we try to allocate the
pointer array for writing to the new `UnsafeExternalSorter` *before* actually
freeing the memory from the hash map. This lead to the following exception:
```
java.io.IOException: Could not acquire 65536 bytes of memory
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
at
org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
at
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
```
Author: Andrew Or <[email protected]>
Closes #8827 from andrewor14/allocate-pointer-array.
(cherry picked from commit 7ff8d68cc19299e16dedfd819b9e96480fa6cf44)
Signed-off-by: Andrew Or <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49355d0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49355d0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49355d0e
Branch: refs/heads/branch-1.5
Commit: 49355d0e032cfe82b907e6cb45c0b894387ba46b
Parents: b3f1e65
Author: Andrew Or <[email protected]>
Authored: Fri Sep 18 23:58:25 2015 -0700
Committer: Andrew Or <[email protected]>
Committed: Fri Sep 18 23:58:36 2015 -0700
----------------------------------------------------------------------
.../unsafe/sort/UnsafeExternalSorter.java | 14 +++++-
.../sql/execution/UnsafeKVExternalSorter.java | 8 +++-
.../UnsafeFixedWidthAggregationMapSuite.scala | 49 +++++++++++++++++++-
3 files changed, 66 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/49355d0e/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index fc364e0..14b6aaf 100644
---
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -159,7 +159,7 @@ public final class UnsafeExternalSorter {
/**
* Allocates new sort data structures. Called when creating the sorter and
after each spill.
*/
- private void initializeForWriting() throws IOException {
+ public void initializeForWriting() throws IOException {
this.writeMetrics = new ShuffleWriteMetrics();
final long pointerArrayMemory =
UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
@@ -187,6 +187,14 @@ public final class UnsafeExternalSorter {
* Sort and spill the current records in response to memory pressure.
*/
public void spill() throws IOException {
+ spill(true);
+ }
+
+ /**
+ * Sort and spill the current records in response to memory pressure.
+ * @param shouldInitializeForWriting whether to allocate memory for writing
after the spill
+ */
+ public void spill(boolean shouldInitializeForWriting) throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
@@ -217,7 +225,9 @@ public final class UnsafeExternalSorter {
// written to disk. This also counts the space needed to store the
sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
- initializeForWriting();
+ if (shouldInitializeForWriting) {
+ initializeForWriting();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/49355d0e/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 7db6b7f..b81f67a 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -85,6 +85,7 @@ public final class UnsafeKVExternalSorter {
// We will use the number of elements in the map as the initialSize of
the
// UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0
as the initialSize,
// we will use 1 as its initial size if the map is empty.
+ // TODO: track pointer array memory used by this in-memory sorter!
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1,
map.numElements()));
@@ -123,8 +124,13 @@ public final class UnsafeKVExternalSorter {
pageSizeBytes,
inMemSorter);
- sorter.spill();
+ // Note: This spill doesn't actually release any memory, so if we try to
allocate a new
+ // pointer array immediately after the spill then we may fail to acquire
sufficient space
+ // for it (SPARK-10474). For this reason, we must initialize for writing
explicitly *after*
+ // we have actually freed memory from our map.
+ sorter.spill(false /* initialize for writing */);
map.free();
+ sorter.initializeForWriting();
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/49355d0e/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index d1f0b2b..ada4d42 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -23,9 +23,10 @@ import scala.util.{Try, Random}
import org.scalatest.Matchers
-import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.{TaskContextImpl, TaskContext, SparkFunSuite}
+import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator,
TaskMemoryManager}
@@ -325,7 +326,7 @@ class UnsafeFixedWidthAggregationMapSuite
// At here, we also test if copy is correct.
iter.getKey.copy()
iter.getValue.copy()
- count += 1;
+ count += 1
}
// 1 record was from the map and 4096 records were explicitly inserted.
@@ -333,4 +334,48 @@ class UnsafeFixedWidthAggregationMapSuite
map.free()
}
+
+ testWithMemoryLeakDetection("convert to external sorter under memory
pressure (SPARK-10474)") {
+ val smm = ShuffleMemoryManager.createForTesting(65536)
+ val pageSize = 4096
+ val map = new UnsafeFixedWidthAggregationMap(
+ emptyAggregationBuffer,
+ aggBufferSchema,
+ groupKeySchema,
+ taskMemoryManager,
+ smm,
+ 128, // initial capacity
+ pageSize,
+ false // disable perf metrics
+ )
+
+ // Insert into the map until we've run out of space
+ val rand = new Random(42)
+ var hasSpace = true
+ while (hasSpace) {
+ val str = rand.nextString(1024)
+ val buf =
map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
+ if (buf == null) {
+ hasSpace = false
+ } else {
+ buf.setInt(0, str.length)
+ }
+ }
+
+ // Ensure we're actually maxed out by asserting that we can't acquire even
just 1 byte
+ assert(smm.tryToAcquire(1) === 0)
+
+ // Convert the map into a sorter. This used to fail before the fix for
SPARK-10474
+ // because we would try to acquire space for the in-memory sorter pointer
array before
+ // actually releasing the pages despite having spilled all of them.
+ var sorter: UnsafeKVExternalSorter = null
+ try {
+ sorter = map.destructAndCreateExternalSorter()
+ } finally {
+ if (sorter != null) {
+ sorter.cleanupResources()
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]