This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0c6d7fd277ba [SPARK-52651][SQL] Handle User Defined Type in Nested
ColumnVector
0c6d7fd277ba is described below
commit 0c6d7fd277ba39899795835ca864402405724ad4
Author: Kent Yao <[email protected]>
AuthorDate: Thu Jul 3 09:48:25 2025 +0800
[SPARK-52651][SQL] Handle User Defined Type in Nested ColumnVector
### What changes were proposed in this pull request?
When I read a map column with a UDT nested, I encountered:
```
Caused by: java.lang.IllegalArgumentException: Spark type: ... doesn't
match the type: ... in column vector
at
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:80)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:139)
```
This PR adds a recursive loop to omit the UDT
### Why are the changes needed?
Add UDT missing features
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New Tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #51349 from yaooqinn/SPARK-52651.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../apache/spark/sql/vectorized/ColumnVector.java | 23 +++++++++++++++-----
.../parquet/ParquetSchemaConverter.scala | 3 +--
.../execution/vectorized/ColumnVectorSuite.scala | 25 +++++++++++++++++++++-
3 files changed, 43 insertions(+), 8 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index 54b62c00283f..f1d1f5b3ea80 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.vectorized;
+import scala.PartialFunction;
+
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
@@ -336,10 +338,21 @@ public abstract class ColumnVector implements
AutoCloseable {
* Sets up the data type of this column vector.
*/
protected ColumnVector(DataType type) {
- if (type instanceof UserDefinedType) {
- this.type = ((UserDefinedType) type).sqlType();
- } else {
- this.type = type;
- }
+ this.type = type.transformRecursively(
+ new PartialFunction<DataType, DataType>() {
+ @Override
+ public boolean isDefinedAt(DataType x) {
+ return x instanceof UserDefinedType<?>;
+ }
+
+ @Override
+ public DataType apply(DataType t) {
+ if (t instanceof UserDefinedType<?> udt) {
+ return udt.sqlType();
+ } else {
+ return t;
+ }
+ }
+ });
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index e05d5fe2fd88..16bd776bea0c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -196,8 +196,7 @@ class ParquetToSparkSchemaConverter(
field: ColumnIO,
sparkReadType: Option[DataType] = None): ParquetColumn = {
val targetType = sparkReadType.map {
- case udt: UserDefinedType[_] => udt.sqlType
- case otherType => otherType
+ _.transformRecursively { case t: UserDefinedType[_] => t.sqlType }
}
field match {
case primitiveColumn: PrimitiveColumnIO =>
convertPrimitiveField(primitiveColumn, targetType)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 0edbfd10d8cd..a0fe44b96e7d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.vectorized
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.YearUDT
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.execution.columnar.{ColumnAccessor,
ColumnDictionary}
@@ -926,5 +927,27 @@ class ColumnVectorSuite extends SparkFunSuite with
SQLHelper {
}
}
}
-}
+ val yearUDT = new YearUDT
+ testVectors("user defined type", 10, yearUDT) { testVector =>
+ assert(testVector.dataType() === IntegerType)
+ (0 until 10).foreach { i =>
+ testVector.appendInt(i)
+ }
+ }
+
+ testVectors("user defined type in map type",
+ 10, MapType(IntegerType, yearUDT)) { testVector =>
+ assert(testVector.dataType() === MapType(IntegerType, IntegerType))
+ }
+
+ testVectors("user defined type in array type",
+ 10, ArrayType(yearUDT, containsNull = true)) { testVector =>
+ assert(testVector.dataType() === ArrayType(IntegerType, containsNull =
true))
+ }
+
+ testVectors("user defined type in struct type",
+ 10, StructType(Seq(StructField("year", yearUDT)))) { testVector =>
+ assert(testVector.dataType() === StructType(Seq(StructField("year",
IntegerType))))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]