Repository: spark Updated Branches: refs/heads/branch-2.0 e8d1bf60a -> edd5dd24a
[SPARK-16123] Avoid NegativeArraySizeException while reserving additional capacity in VectorizedColumnReader ## What changes were proposed in this pull request? This patch fixes an overflow bug in vectorized parquet reader where both off-heap and on-heap variants of `ColumnVector.reserve()` can unfortunately overflow while reserving additional capacity during reads. ## How was this patch tested? Manual Tests Author: Sameer Agarwal <[email protected]> Closes #13832 from sameeragarwal/negative-array. (cherry picked from commit cc71d4fa372f6eb187c68dbd8358de4003ace3fe) Signed-off-by: Herman van Hovell <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edd5dd24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edd5dd24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edd5dd24 Branch: refs/heads/branch-2.0 Commit: edd5dd24a56f1316d14ca23261cea5b49b16adf2 Parents: e8d1bf6 Author: Sameer Agarwal <[email protected]> Authored: Thu Jun 23 18:21:41 2016 -0700 Committer: Herman van Hovell <[email protected]> Committed: Thu Jun 23 18:21:56 2016 -0700 ---------------------------------------------------------------------- .../sql/execution/vectorized/ColumnVector.java | 26 ++++++++++++++++++-- .../vectorized/OffHeapColumnVector.java | 8 ++---- .../vectorized/OnHeapColumnVector.java | 8 ++---- .../vectorized/ColumnarBatchSuite.scala | 19 ++++++++++++++ 4 files changed, 47 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/edd5dd24/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 3f94255..80c84b1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized; import java.math.BigDecimal; import java.math.BigInteger; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.NotImplementedException; import org.apache.parquet.column.Dictionary; import org.apache.parquet.io.api.Binary; @@ -27,6 +28,7 @@ import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -277,11 +279,25 @@ public abstract class ColumnVector implements AutoCloseable { */ public abstract void close(); - /* + public void reserve(int requiredCapacity) { + if (requiredCapacity > capacity) { + int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L); + if (requiredCapacity <= newCapacity) { + reserveInternal(newCapacity); + } else { + throw new RuntimeException("Cannot reserve more than " + newCapacity + + " bytes in the vectorized reader (requested = " + requiredCapacity + " bytes). As a " + + "workaround, you can disable the vectorized reader by setting " + + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + " to false."); + } + } + } + + /** * Ensures that there is enough storage to store capcity elements. That is, the put() APIs * must work for all rowIds < capcity. */ - public abstract void reserve(int capacity); + protected abstract void reserveInternal(int capacity); /** * Returns the number of nulls in this column. @@ -847,6 +863,12 @@ public abstract class ColumnVector implements AutoCloseable { protected int capacity; /** + * Upper limit for the maximum capacity for this column. + */ + @VisibleForTesting + protected int MAX_CAPACITY = Integer.MAX_VALUE; + + /** * Data type for this column. */ protected final DataType type; http://git-wip-us.apache.org/repos/asf/spark/blob/edd5dd24/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 70b4a68..913a05a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -422,13 +422,9 @@ public final class OffHeapColumnVector extends ColumnVector { array.byteArrayOffset = 0; } - @Override - public void reserve(int requiredCapacity) { - if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2); - } - // Split out the slow path. - private void reserveInternal(int newCapacity) { + @Override + protected void reserveInternal(int newCapacity) { if (this.resultArray != null) { this.lengthData = Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4); http://git-wip-us.apache.org/repos/asf/spark/blob/edd5dd24/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 7fb7617..85067df 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -392,13 +392,9 @@ public final class OnHeapColumnVector extends ColumnVector { return result; } - @Override - public void reserve(int requiredCapacity) { - if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2); - } - // Spilt this function out since it is the slow path. - private void reserveInternal(int newCapacity) { + @Override + protected void reserveInternal(int newCapacity) { if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { int[] newLengths = new int[newCapacity]; int[] newOffsets = new int[newCapacity]; http://git-wip-us.apache.org/repos/asf/spark/blob/edd5dd24/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 7e576a8..100cc4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -787,4 +787,23 @@ class ColumnarBatchSuite extends SparkFunSuite { } } } + + test("exceeding maximum capacity should throw an error") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => + val column = ColumnVector.allocate(1, ByteType, memMode) + column.MAX_CAPACITY = 15 + column.appendBytes(5, 0.toByte) + // Successfully allocate twice the requested capacity + assert(column.capacity == 10) + column.appendBytes(10, 0.toByte) + // Allocated capacity doesn't exceed MAX_CAPACITY + assert(column.capacity == 15) + val ex = intercept[RuntimeException] { + // Over-allocating beyond MAX_CAPACITY throws an exception + column.appendBytes(10, 0.toByte) + } + assert(ex.getMessage.contains(s"Cannot reserve more than ${column.MAX_CAPACITY} bytes in " + + s"the vectorized reader")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
