Repository: spark
Updated Branches:
  refs/heads/master 264bc6362 -> cc71d4fa3


[SPARK-16123] Avoid NegativeArraySizeException while reserving additional 
capacity in VectorizedColumnReader

## What changes were proposed in this pull request?

This patch fixes an overflow bug in vectorized parquet reader where both 
off-heap and on-heap variants of `ColumnVector.reserve()` can unfortunately 
overflow while reserving additional capacity during reads.

## How was this patch tested?

Manual Tests

Author: Sameer Agarwal <[email protected]>

Closes #13832 from sameeragarwal/negative-array.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc71d4fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc71d4fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc71d4fa

Branch: refs/heads/master
Commit: cc71d4fa372f6eb187c68dbd8358de4003ace3fe
Parents: 264bc63
Author: Sameer Agarwal <[email protected]>
Authored: Thu Jun 23 18:21:41 2016 -0700
Committer: Herman van Hovell <[email protected]>
Committed: Thu Jun 23 18:21:41 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/vectorized/ColumnVector.java  | 26 ++++++++++++++++++--
 .../vectorized/OffHeapColumnVector.java         |  8 ++----
 .../vectorized/OnHeapColumnVector.java          |  8 ++----
 .../vectorized/ColumnarBatchSuite.scala         | 19 ++++++++++++++
 4 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc71d4fa/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 3f94255..80c84b1 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.io.api.Binary;
@@ -27,6 +28,7 @@ import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -277,11 +279,25 @@ public abstract class ColumnVector implements 
AutoCloseable {
    */
   public abstract void close();
 
-  /*
+  public void reserve(int requiredCapacity) {
+    if (requiredCapacity > capacity) {
+      int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
+      if (requiredCapacity <= newCapacity) {
+        reserveInternal(newCapacity);
+      } else {
+        throw new RuntimeException("Cannot reserve more than " + newCapacity +
+            " bytes in the vectorized reader (requested = " + requiredCapacity 
+ " bytes). As a " +
+            "workaround, you can disable the vectorized reader by setting "
+            + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + " to 
false.");
+      }
+    }
+  }
+
+  /**
    * Ensures that there is enough storage to store capcity elements. That is, 
the put() APIs
    * must work for all rowIds < capcity.
    */
-  public abstract void reserve(int capacity);
+  protected abstract void reserveInternal(int capacity);
 
   /**
    * Returns the number of nulls in this column.
@@ -847,6 +863,12 @@ public abstract class ColumnVector implements 
AutoCloseable {
   protected int capacity;
 
   /**
+   * Upper limit for the maximum capacity for this column.
+   */
+  @VisibleForTesting
+  protected int MAX_CAPACITY = Integer.MAX_VALUE;
+
+  /**
    * Data type for this column.
    */
   protected final DataType type;

http://git-wip-us.apache.org/repos/asf/spark/blob/cc71d4fa/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 70b4a68..913a05a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -422,13 +422,9 @@ public final class OffHeapColumnVector extends 
ColumnVector {
     array.byteArrayOffset = 0;
   }
 
-  @Override
-  public void reserve(int requiredCapacity) {
-    if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
-  }
-
   // Split out the slow path.
-  private void reserveInternal(int newCapacity) {
+  @Override
+  protected void reserveInternal(int newCapacity) {
     if (this.resultArray != null) {
       this.lengthData =
           Platform.reallocateMemory(lengthData, elementsAppended * 4, 
newCapacity * 4);

http://git-wip-us.apache.org/repos/asf/spark/blob/cc71d4fa/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 7fb7617..85067df 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -392,13 +392,9 @@ public final class OnHeapColumnVector extends ColumnVector 
{
     return result;
   }
 
-  @Override
-  public void reserve(int requiredCapacity) {
-    if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
-  }
-
   // Spilt this function out since it is the slow path.
-  private void reserveInternal(int newCapacity) {
+  @Override
+  protected void reserveInternal(int newCapacity) {
     if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) {
       int[] newLengths = new int[newCapacity];
       int[] newOffsets = new int[newCapacity];

http://git-wip-us.apache.org/repos/asf/spark/blob/cc71d4fa/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 7e576a8..100cc4d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -787,4 +787,23 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
     }
   }
+
+  test("exceeding maximum capacity should throw an error") {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
+      val column = ColumnVector.allocate(1, ByteType, memMode)
+      column.MAX_CAPACITY = 15
+      column.appendBytes(5, 0.toByte)
+      // Successfully allocate twice the requested capacity
+      assert(column.capacity == 10)
+      column.appendBytes(10, 0.toByte)
+      // Allocated capacity doesn't exceed MAX_CAPACITY
+      assert(column.capacity == 15)
+      val ex = intercept[RuntimeException] {
+        // Over-allocating beyond MAX_CAPACITY throws an exception
+        column.appendBytes(10, 0.toByte)
+      }
+      assert(ex.getMessage.contains(s"Cannot reserve more than 
${column.MAX_CAPACITY} bytes in " +
+        s"the vectorized reader"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to