This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 3767a332cf08 [SPARK-56844][SQL] Support ArrayType / MapType /
StructType in ConstantColumnVector and FileSourceMetadataAttribute
3767a332cf08 is described below
commit 3767a332cf08349c7e6ebaf0f17bbd7bbbb50e29
Author: Matt Zhang <[email protected]>
AuthorDate: Thu May 14 09:15:08 2026 +0800
[SPARK-56844][SQL] Support ArrayType / MapType / StructType in
ConstantColumnVector and FileSourceMetadataAttribute
### What changes were proposed in this pull request?
Allow `ArrayType`, `MapType`, and `StructType` in file source constant
metadata attributes. Concretely:
- `FileSourceMetadataAttribute.isSupportedType` allows complex types
recursively, contingent on their element types being supported. Behavior for
primitives, decimal, string, binary, interval, and variant is unchanged.
- `ColumnVectorUtils.populate` gains struct/array/map branches:
- Struct: recurse into pre-allocated child `ConstantColumnVector`s.
- Array/map: allocate a one-row `OffHeapColumnVector` backing and reuse
the existing `RowToColumnConverter` (wrapped in a single-field struct schema)
to write the constant value. The view is handed to the constant vector along
with ownership of the backing.
- `ConstantColumnVector` gains optional ownership of a backing
`WritableColumnVector` (closed by `close()`), exposed via new
`setArrayWithBacking` / `setMapWithBacking`. The original `setArray` / `setMap`
are unchanged (caller retains ownership).
- `ConstantColumnVector`'s constructor pre-allocates struct children so
`populate`'s struct recursion has a target. `setChild` closes any
previously-set child to avoid leaking the auto-allocated one.
Notes on correctness: the recursive copy for array and map reuses
`RowToColumnConverter`, which already drives row-to-columnar conversion across
all supported types (`RowToColumnarExec`). No new per-type dispatch logic is
introduced.
### Why are the changes needed?
`FileSourceMetadataAttribute.isSupportedType` is the lone gate preventing
complex-typed file source constant metadata; the underlying machinery already
supports them.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New cases in `ColumnVectorUtilsSuite` replacing the previous "not
supported" assertions:
- `fill array of ints`
- `fill array of strings`
- `fill map of int -> boolean`
- `fill struct`
- `fill nested array<struct>` (covers element-level nulls)
- `fill null array`
Existing `ConstantColumnVectorSuite` cases continue to exercise the same
paths.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude (Anthropic)
Closes #55844 from
mzhang/mzhang/SPARK-56844-constant-column-vector-complex-types.
Lead-authored-by: Matt Zhang <[email protected]>
Co-authored-by: Matt Zhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e19bc35c7c7d857bb9cfa660ad8bf554dbf308c2)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalyst/expressions/namedExpressions.scala | 20 ++--
.../datasources/orc/OrcColumnarBatchReader.java | 3 +-
.../parquet/VectorizedParquetRecordReader.java | 2 +-
.../execution/vectorized/ColumnVectorUtils.java | 45 +++++++-
.../execution/vectorized/ConstantColumnVector.java | 47 ++++++++-
.../sql/execution/datasources/FileScanRDD.scala | 11 +-
.../vectorized/ColumnVectorUtilsSuite.scala | 117 +++++++++++++++++----
7 files changed, 212 insertions(+), 33 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index ccefdc0999ea..d27f140d083b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -727,14 +727,20 @@ object FileSourceMetadataAttribute {
*
* The set of supported types is limited by [[ColumnVectorUtils.populate]],
which the constant
* file metadata implementation relies on. In general, types that can be
partition columns are
- * supported (including most primitive types). Notably unsupported types
include [[ObjectType]],
- * [[UserDefinedType]], and the complex types ([[StructType]], [[MapType]],
[[ArrayType]]).
+ * supported (including most primitive types), plus the complex types
[[ArrayType]],
+ * [[MapType]], and [[StructType]] (recursively, as long as their element
types are supported).
+ * Notably unsupported types include [[ObjectType]] and [[UserDefinedType]].
*/
- def isSupportedType(dataType: DataType): Boolean =
PhysicalDataType(dataType) match {
- // PhysicalPrimitiveType covers: Boolean, Byte, Double, Float, Integer,
Long, Null, Short
- case _: PhysicalPrimitiveType | _: PhysicalDecimalType => true
- case PhysicalBinaryType | PhysicalStringType(_) |
PhysicalCalendarIntervalType => true
- case _ => false
+ def isSupportedType(dataType: DataType): Boolean = dataType match {
+ case ArrayType(elementType, _) => isSupportedType(elementType)
+ case MapType(keyType, valueType, _) => isSupportedType(keyType) &&
isSupportedType(valueType)
+ case st: StructType => st.fields.forall(f => isSupportedType(f.dataType))
+ case _ => PhysicalDataType(dataType) match {
+ // PhysicalPrimitiveType covers: Boolean, Byte, Double, Float, Integer,
Long, Null, Short
+ case _: PhysicalPrimitiveType | _: PhysicalDecimalType => true
+ case PhysicalBinaryType | PhysicalStringType(_) |
PhysicalCalendarIntervalType => true
+ case _ => false
+ }
}
/** Returns the type unchanged if valid; otherwise throws
[[IllegalArgumentException]]. */
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 1c09cc9f7ff2..ea33aa1e2325 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -182,7 +182,8 @@ public class OrcColumnarBatchReader extends
RecordReader<Void, ColumnarBatch> {
DataType dt = requiredFields[i].dataType();
if (requestedPartitionColIds[i] != -1) {
ConstantColumnVector partitionCol = new ConstantColumnVector(capacity,
dt);
- ColumnVectorUtils.populate(partitionCol, partitionValues,
requestedPartitionColIds[i]);
+ ColumnVectorUtils.populate(
+ partitionCol, partitionValues, requestedPartitionColIds[i],
memoryMode);
orcVectorWrappers[i] = partitionCol;
} else {
int colId = requestedDataColIds[i];
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 72125701fd49..5e782433f557 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -303,7 +303,7 @@ public class VectorizedParquetRecordReader extends
SpecificParquetRecordReaderBa
int partitionIdx = sparkSchema.fields().length;
for (int i = 0; i < partitionColumns.fields().length; i++) {
ColumnVectorUtils.populate(
- (ConstantColumnVector) vectors[i + partitionIdx], partitionValues,
i);
+ (ConstantColumnVector) vectors[i + partitionIdx], partitionValues,
i, MEMORY_MODE);
}
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 9ff385c995ff..1ca9290e3b7c 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -31,8 +31,10 @@ import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.types.*;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.RowToColumnConverter;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -49,9 +51,22 @@ import org.apache.spark.unsafe.types.VariantVal;
public class ColumnVectorUtils {
/**
- * Populates the value of `row[fieldIdx]` into `ConstantColumnVector`.
+ * Populates the value of `row[fieldIdx]` into `ConstantColumnVector`. For
complex types
+ * (array / map) this allocates a small backing `WritableColumnVector`
on-heap by default. Use
+ * the {@link #populate(ConstantColumnVector, InternalRow, int, MemoryMode)}
overload to control
+ * the backing memory mode.
*/
public static void populate(ConstantColumnVector col, InternalRow row, int
fieldIdx) {
+ populate(col, row, fieldIdx, MemoryMode.ON_HEAP);
+ }
+
+ /**
+ * Populates the value of `row[fieldIdx]` into `ConstantColumnVector`. For
array / map values,
+ * `memMode` selects on-heap vs off-heap allocation for the backing
`WritableColumnVector` that
+ * holds the constant element data; it has no effect on primitive types.
+ */
+ public static void populate(
+ ConstantColumnVector col, InternalRow row, int fieldIdx, MemoryMode
memMode) {
DataType t = col.dataType();
PhysicalDataType pdt = PhysicalDataType.apply(t);
@@ -93,6 +108,34 @@ public class ColumnVectorUtils {
col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalVariantType) {
col.setVariant((VariantVal)row.get(fieldIdx, t));
+ } else if (pdt instanceof PhysicalStructType) {
+ StructType st = (StructType) t;
+ InternalRow inner = row.getStruct(fieldIdx, st.fields().length);
+ InternalRow tmpRow = new GenericInternalRow(1);
+ for (int i = 0; i < st.fields().length; i++) {
+ StructField field = st.fields()[i];
+ tmpRow.update(0, inner.isNullAt(i) ? null : inner.get(i,
field.dataType()));
+ // ConstantColumnVector's constructor pre-allocates struct children,
so the recursive
+ // populate call below has a target vector to write into.
+ populate((ConstantColumnVector) col.getChild(i), tmpRow, 0, memMode);
+ }
+ } else if (pdt instanceof PhysicalArrayType || pdt instanceof
PhysicalMapType) {
+ // Allocate a 1-row backing vector (on-heap or off-heap per `memMode`)
to hold the
+ // constant complex value.
+ WritableColumnVector backing = memMode == MemoryMode.OFF_HEAP
+ ? new OffHeapColumnVector(1, t)
+ : new OnHeapColumnVector(1, t);
+ // Reuse RowToColumnConverter by wrapping `t` as a single-field struct
schema and
+ // converting the one-row input. This recursively handles all element
types correctly.
+ StructType wrapperSchema = new StructType().add("v", t, true);
+ RowToColumnConverter converter = new
RowToColumnConverter(wrapperSchema);
+ InternalRow wrapped = new GenericInternalRow(new
Object[]{row.get(fieldIdx, t)});
+ converter.convert(wrapped, new WritableColumnVector[]{backing});
+ if (pdt instanceof PhysicalArrayType) {
+ col.setArrayWithBacking(backing.getArray(0), backing);
+ } else {
+ col.setMapWithBacking(backing.getMap(0), backing);
+ }
} else {
throw new RuntimeException(String.format("DataType %s is not
supported" +
" in column vectorized reader.", t.sql()));
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
index cd2a82169885..094d6edb6d25 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ConstantColumnVector.java
@@ -49,6 +49,8 @@ public class ConstantColumnVector extends ColumnVector {
private ConstantColumnVector[] childData;
private ColumnarArray arrayData;
private ColumnarMap mapData;
+ // Optionally owned backing storage for arrayData / mapData. Closed by
close().
+ private WritableColumnVector ownedBacking;
private final int numRows;
@@ -62,6 +64,9 @@ public class ConstantColumnVector extends ColumnVector {
if (type instanceof StructType structType) {
this.childData = new ConstantColumnVector[structType.fields().length];
+ for (int i = 0; i < structType.fields().length; i++) {
+ this.childData[i] = new ConstantColumnVector(1,
structType.fields()[i].dataType());
+ }
} else if (type instanceof CalendarIntervalType) {
// Three columns. Months as int. Days as Int. Microseconds as Long.
this.childData = new ConstantColumnVector[3];
@@ -97,6 +102,10 @@ public class ConstantColumnVector extends ColumnVector {
}
arrayData = null;
mapData = null;
+ if (ownedBacking != null) {
+ ownedBacking.close();
+ ownedBacking = null;
+ }
}
@Override
@@ -218,24 +227,51 @@ public class ConstantColumnVector extends ColumnVector {
}
/**
- * Sets the `ColumnarArray` `value` for all rows
+ * Sets the `ColumnarArray` `value` for all rows. The caller retains
ownership of the backing
+ * storage for `value`; use `setArrayWithBacking` if this vector should own
and close it.
*/
public void setArray(ColumnarArray value) {
arrayData = value;
}
+ /**
+ * Sets the `ColumnarArray` `value` for all rows and takes ownership of
`backing`, which will be
+ * closed when this vector is closed.
+ */
+ public void setArrayWithBacking(ColumnarArray value, WritableColumnVector
backing) {
+ arrayData = value;
+ replaceOwnedBacking(backing);
+ }
+
@Override
public ColumnarMap getMap(int ordinal) {
return mapData;
}
/**
- * Sets the `ColumnarMap` `value` for all rows
+ * Sets the `ColumnarMap` `value` for all rows. The caller retains ownership
of the backing
+ * storage for `value`; use `setMapWithBacking` if this vector should own
and close it.
*/
public void setMap(ColumnarMap value) {
mapData = value;
}
+ /**
+ * Sets the `ColumnarMap` `value` for all rows and takes ownership of
`backing`, which will be
+ * closed when this vector is closed.
+ */
+ public void setMapWithBacking(ColumnarMap value, WritableColumnVector
backing) {
+ mapData = value;
+ replaceOwnedBacking(backing);
+ }
+
+ private void replaceOwnedBacking(WritableColumnVector backing) {
+ if (ownedBacking != null && ownedBacking != backing) {
+ ownedBacking.close();
+ }
+ ownedBacking = backing;
+ }
+
@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
// copy and modify from WritableColumnVector
@@ -303,9 +339,14 @@ public class ConstantColumnVector extends ColumnVector {
}
/**
- * Sets the child `ConstantColumnVector` `value` at the given ordinal for
all rows
+ * Sets the child `ConstantColumnVector` `value` at the given ordinal for
all rows. Closes any
+ * previously-set child at this ordinal (e.g., one auto-allocated by the
constructor) to avoid
+ * leaking its backing storage.
*/
public void setChild(int ordinal, ConstantColumnVector value) {
+ if (childData[ordinal] != null && childData[ordinal] != value) {
+ childData[ordinal].close();
+ }
childData[ordinal] = value;
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 5dc13ccee9ce..ac1e87de863e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.security.AccessControlException
import org.apache.spark.{Partition => RDDPartition, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.LogKeys.{CURRENT_FILE, PATH}
+import org.apache.spark.memory.MemoryMode
import org.apache.spark.paths.SparkPath
import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
import org.apache.spark.sql.SparkSession
@@ -89,6 +90,14 @@ class FileScanRDD(
private val ignoreCorruptFiles = options.ignoreCorruptFiles
private val ignoreMissingFiles = options.ignoreMissingFiles
+ // Evaluated on the driver (sparkSession is @transient) and serialized to
executors so the
+ // `compute` iterator below can pass it through to
ColumnVectorUtils.populate.
+ private val memoryMode: MemoryMode =
+ if (sparkSession.sessionState.conf.offHeapColumnVectorEnabled) {
+ MemoryMode.OFF_HEAP
+ } else {
+ MemoryMode.ON_HEAP
+ }
override def compute(split: RDDPartition, context: TaskContext):
Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
@@ -183,7 +192,7 @@ class FileScanRDD(
}
val columnVector = new ConstantColumnVector(c.numRows(),
attr.dataType)
- ColumnVectorUtils.populate(columnVector, tmpRow, 0)
+ ColumnVectorUtils.populate(columnVector, tmpRow, 0, memoryMode)
columnVector
}.toArray
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala
index 6205484d6be7..b1c0d6c1d7d5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorUtilsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.unsafe.types.UTF8String
@@ -134,30 +135,108 @@ class ColumnVectorUtilsSuite extends SparkFunSuite {
}
}
- testConstantColumnVector("not supported: fill map", 10,
+ testConstantColumnVector("fill array of ints", 10, ArrayType(IntegerType)) {
vector =>
+ val arr = new GenericArrayData(Array[Any](1, 2, 3, 4, 5))
+ ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
+ (0 until 10).foreach { i =>
+ assert(vector.getArray(i).toIntArray === Array(1, 2, 3, 4, 5))
+ }
+ }
+
+ testConstantColumnVector("fill array of strings", 10, ArrayType(StringType))
{ vector =>
+ val arr = new GenericArrayData(Array[Any](
+ UTF8String.fromString("a"),
+ UTF8String.fromString("bb"),
+ UTF8String.fromString("ccc")))
+ ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
+ (0 until 10).foreach { i =>
+ val a = vector.getArray(i)
+ assert(a.numElements() == 3)
+ assert(a.getUTF8String(0) == UTF8String.fromString("a"))
+ assert(a.getUTF8String(1) == UTF8String.fromString("bb"))
+ assert(a.getUTF8String(2) == UTF8String.fromString("ccc"))
+ }
+ }
+
+ testConstantColumnVector("fill map of int -> boolean", 10,
MapType(IntegerType, BooleanType)) { vector =>
- val message = intercept[RuntimeException] {
- ColumnVectorUtils.populate(vector, InternalRow("fakeMap"), 0)
- }.getMessage
- assert(message == "DataType MAP<INT, BOOLEAN> is not supported in column
vectorized reader.")
+ val keys = new GenericArrayData(Array[Any](1, 2, 3))
+ val values = new GenericArrayData(Array[Any](true, false, true))
+ val map = new ArrayBasedMapData(keys, values)
+ ColumnVectorUtils.populate(vector, InternalRow(map), 0)
+ (0 until 10).foreach { i =>
+ val m = vector.getMap(i)
+ assert(m.numElements() == 3)
+ assert(m.keyArray().toIntArray === Array(1, 2, 3))
+ assert(m.valueArray().toBooleanArray === Array(true, false, true))
+ }
}
- testConstantColumnVector("not supported: fill struct", 10,
+ testConstantColumnVector("fill struct", 10,
new StructType()
.add(StructField("name", StringType))
.add(StructField("age", IntegerType))) { vector =>
- val message = intercept[RuntimeException] {
- ColumnVectorUtils.populate(vector, InternalRow("fakeStruct"), 0)
- }.getMessage
- assert(message ==
- "DataType STRUCT<name: STRING, age: INT> is not supported in column
vectorized reader.")
- }
-
- testConstantColumnVector("not supported: fill array", 10,
- ArrayType(IntegerType)) { vector =>
- val message = intercept[RuntimeException] {
- ColumnVectorUtils.populate(vector, InternalRow("fakeArray"), 0)
- }.getMessage
- assert(message == "DataType ARRAY<INT> is not supported in column
vectorized reader.")
+ val row = InternalRow(UTF8String.fromString("jack"), 27)
+ ColumnVectorUtils.populate(vector, InternalRow(row), 0)
+ (0 until 10).foreach { i =>
+ assert(vector.getChild(0).getUTF8String(i) ==
UTF8String.fromString("jack"))
+ assert(vector.getChild(1).getInt(i) == 27)
+ }
+ }
+
+ testConstantColumnVector("fill struct with null field", 10,
+ new StructType()
+ .add(StructField("name", StringType, nullable = true))
+ .add(StructField("age", IntegerType))) { vector =>
+ val row = InternalRow(null, 27)
+ ColumnVectorUtils.populate(vector, InternalRow(row), 0)
+ (0 until 10).foreach { i =>
+ assert(vector.getChild(0).isNullAt(i))
+ assert(vector.getChild(1).getInt(i) == 27)
+ }
+ }
+
+ testConstantColumnVector("fill nested struct", 10,
+ new StructType()
+ .add(StructField("inner",
+ new StructType()
+ .add(StructField("k", StringType))
+ .add(StructField("v", IntegerType))))
+ .add(StructField("flag", BooleanType))) { vector =>
+ val inner = InternalRow(UTF8String.fromString("a"), 1)
+ val outer = InternalRow(inner, true)
+ ColumnVectorUtils.populate(vector, InternalRow(outer), 0)
+ (0 until 10).foreach { i =>
+ val s = vector.getChild(0)
+ assert(s.getChild(0).getUTF8String(i) == UTF8String.fromString("a"))
+ assert(s.getChild(1).getInt(i) == 1)
+ assert(vector.getChild(1).getBoolean(i))
+ }
+ }
+
+ testConstantColumnVector("fill nested array<struct>", 10,
+ ArrayType(new StructType()
+ .add(StructField("k", StringType))
+ .add(StructField("v", IntegerType)))) { vector =>
+ val structs = new GenericArrayData(Array[Any](
+ InternalRow(UTF8String.fromString("a"), 1),
+ InternalRow(UTF8String.fromString("bb"), 2),
+ InternalRow(null, 3)))
+ ColumnVectorUtils.populate(vector, InternalRow(structs), 0)
+ (0 until 10).foreach { i =>
+ val a = vector.getArray(i)
+ assert(a.numElements() == 3)
+ assert(a.getStruct(0, 2).getUTF8String(0) == UTF8String.fromString("a"))
+ assert(a.getStruct(0, 2).getInt(1) == 1)
+ assert(a.getStruct(1, 2).getUTF8String(0) == UTF8String.fromString("bb"))
+ assert(a.getStruct(1, 2).getInt(1) == 2)
+ assert(a.getStruct(2, 2).isNullAt(0))
+ assert(a.getStruct(2, 2).getInt(1) == 3)
+ }
+ }
+
+ testConstantColumnVector("fill null array", 10, ArrayType(IntegerType)) {
vector =>
+ ColumnVectorUtils.populate(vector, InternalRow(null), 0)
+ assert(vector.hasNull)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]