This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fac236e1350 [SPARK-44805][SQL] getBytes/getShorts/getInts/etc. should 
work in a column vector that has a dictionary
fac236e1350 is described below

commit fac236e1350d1c71dd772251709db3af877a69c2
Author: Bruce Robbins <[email protected]>
AuthorDate: Fri Sep 8 12:57:42 2023 -0700

    [SPARK-44805][SQL] getBytes/getShorts/getInts/etc. should work in a column 
vector that has a dictionary
    
    ### What changes were proposed in this pull request?
    
    Change getBytes/getShorts/getInts/getLongs/getFloats/getDoubles in 
`OnHeapColumnVector` and `OffHeapColumnVector` to use the dictionary, if 
present.
    
    ### Why are the changes needed?
    
    The following query gets incorrect results:
    ```
    drop table if exists t1;
    
    create table t1 using parquet as
    select * from values
    (named_struct('f1', array(1, 2, 3), 'f2', array(1, 1, 2)))
    as (value);
    
    select cast(value as struct<f1:array<double>,f2:array<int>>) AS value from 
t1;
    
    {"f1":[1.0,2.0,3.0],"f2":[0,0,0]}
    
    ```
    The result should be:
    ```
    {"f1":[1.0,2.0,3.0],"f2":[1,2,3]}
    ```
    The cast operation copies the second array by calling `ColumnarArray#copy`, 
which in turn calls `ColumnarArray#toIntArray`, which in turn calls 
`ColumnVector#getInts` on the underlying column vector (which is either an 
`OnHeapColumnVector` or an `OffHeapColumnVector`). The implementation of 
`getInts` in either concrete class assumes there is no dictionary and does not 
use it if it is present (in fact, it even asserts that there is no dictionary). 
However, in the above example, the col [...]
    ```
    java -cp ~/github/parquet-mr/parquet-tools/target/parquet-tools-1.10.1.jar 
org.apache.parquet.tools.Main meta 
./spark-warehouse/t1/part-00000-122fdd53-8166-407b-aec5-08e0c2845c3d-c000.snappy.parquet
    ...
    row group 1: RC:1 TS:112 OFFSET:4
    
-------------------------------------------------------------------------------------------------------------------------------------------------------
    value:
    .f1:
    ..list:
    ...element:   INT32 SNAPPY DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:RLE,PLAIN 
ST:[min: 1, max: 3, num_nulls: 0]
    .f2:
    ..list:
    ...element:   INT32 SNAPPY DO:51 FPO:80 SZ:69/65/0.94 VC:3 
ENC:RLE,PLAIN_DICTIONARY ST:[min: 1, max: 2, num_nulls: 0]
    
    ```
    The same bug also occurs when field f2 is a map. This PR fixes that case as 
well.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, except for fixing the correctness issue.
    
    ### How was this patch tested?
    
    New tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42850 from bersprockets/vector_oddity.
    
    Authored-by: Bruce Robbins <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/execution/columnar/ColumnDictionary.java   | 18 +++--
 .../execution/vectorized/OffHeapColumnVector.java  | 55 +++++++++++----
 .../execution/vectorized/OnHeapColumnVector.java   | 54 +++++++++++----
 .../datasources/parquet/ParquetQuerySuite.scala    | 10 +++
 .../execution/vectorized/ColumnVectorSuite.scala   | 80 +++++++++++++++++++++-
 5 files changed, 186 insertions(+), 31 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
index 419dda874d3..29271fc5c0a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
@@ -22,6 +22,8 @@ import org.apache.spark.sql.execution.vectorized.Dictionary;
 public final class ColumnDictionary implements Dictionary {
   private int[] intDictionary;
   private long[] longDictionary;
+  private float[] floatDictionary;
+  private double[] doubleDictionary;
 
   public ColumnDictionary(int[] dictionary) {
     this.intDictionary = dictionary;
@@ -31,6 +33,14 @@ public final class ColumnDictionary implements Dictionary {
     this.longDictionary = dictionary;
   }
 
+  public ColumnDictionary(float[] dictionary) {
+    this.floatDictionary = dictionary;
+  }
+
+  public ColumnDictionary(double[] dictionary) {
+    this.doubleDictionary = dictionary;
+  }
+
   @Override
   public int decodeToInt(int id) {
     return intDictionary[id];
@@ -42,14 +52,10 @@ public final class ColumnDictionary implements Dictionary {
   }
 
   @Override
-  public float decodeToFloat(int id) {
-    throw new UnsupportedOperationException("Dictionary encoding does not 
support float");
-  }
+  public float decodeToFloat(int id) { return floatDictionary[id]; }
 
   @Override
-  public double decodeToDouble(int id) {
-    throw new UnsupportedOperationException("Dictionary encoding does not 
support double");
-  }
+  public double decodeToDouble(int id) { return doubleDictionary[id]; }
 
   @Override
   public byte[] decodeToBinary(int id) {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index bc2636caefd..9cb1b1f0b5e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -213,9 +213,14 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public byte[] getBytes(int rowId, int count) {
-    assert(dictionary == null);
     byte[] array = new byte[count];
-    Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, 
count);
+    if (dictionary == null) {
+      Platform.copyMemory(null, data + rowId, array, 
Platform.BYTE_ARRAY_OFFSET, count);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getByte(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -269,9 +274,14 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public short[] getShorts(int rowId, int count) {
-    assert(dictionary == null);
     short[] array = new short[count];
-    Platform.copyMemory(null, data + rowId * 2L, array, 
Platform.SHORT_ARRAY_OFFSET, count * 2L);
+    if (dictionary == null) {
+      Platform.copyMemory(null, data + rowId * 2L, array, 
Platform.SHORT_ARRAY_OFFSET, count * 2L);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getShort(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -330,9 +340,14 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public int[] getInts(int rowId, int count) {
-    assert(dictionary == null);
     int[] array = new int[count];
-    Platform.copyMemory(null, data + rowId * 4L, array, 
Platform.INT_ARRAY_OFFSET, count * 4L);
+    if (dictionary == null) {
+      Platform.copyMemory(null, data + rowId * 4L, array, 
Platform.INT_ARRAY_OFFSET, count * 4L);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getInt(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -403,9 +418,14 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public long[] getLongs(int rowId, int count) {
-    assert(dictionary == null);
     long[] array = new long[count];
-    Platform.copyMemory(null, data + rowId * 8L, array, 
Platform.LONG_ARRAY_OFFSET, count * 8L);
+    if (dictionary == null) {
+      Platform.copyMemory(null, data + rowId * 8L, array, 
Platform.LONG_ARRAY_OFFSET, count * 8L);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getLong(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -462,9 +482,14 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public float[] getFloats(int rowId, int count) {
-    assert(dictionary == null);
     float[] array = new float[count];
-    Platform.copyMemory(null, data + rowId * 4L, array, 
Platform.FLOAT_ARRAY_OFFSET, count * 4L);
+    if (dictionary == null) {
+      Platform.copyMemory(null, data + rowId * 4L, array, 
Platform.FLOAT_ARRAY_OFFSET, count * 4L);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getFloat(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -522,9 +547,15 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public double[] getDoubles(int rowId, int count) {
-    assert(dictionary == null);
     double[] array = new double[count];
-    Platform.copyMemory(null, data + rowId * 8L, array, 
Platform.DOUBLE_ARRAY_OFFSET, count * 8L);
+    if (dictionary == null) {
+      Platform.copyMemory(null, data + rowId * 8L, array, 
Platform.DOUBLE_ARRAY_OFFSET,
+        count * 8L);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getDouble(rowId + i);
+      }
+    }
     return array;
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 56a96907f0f..be590bb9ac7 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -211,9 +211,14 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public byte[] getBytes(int rowId, int count) {
-    assert(dictionary == null);
     byte[] array = new byte[count];
-    System.arraycopy(byteData, rowId, array, 0, count);
+    if (dictionary == null) {
+      System.arraycopy(byteData, rowId, array, 0, count);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getByte(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -266,9 +271,14 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public short[] getShorts(int rowId, int count) {
-    assert(dictionary == null);
     short[] array = new short[count];
-    System.arraycopy(shortData, rowId, array, 0, count);
+    if (dictionary == null) {
+      System.arraycopy(shortData, rowId, array, 0, count);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getShort(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -322,9 +332,14 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public int[] getInts(int rowId, int count) {
-    assert(dictionary == null);
     int[] array = new int[count];
-    System.arraycopy(intData, rowId, array, 0, count);
+    if (dictionary == null) {
+      System.arraycopy(intData, rowId, array, 0, count);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getInt(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -389,9 +404,14 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public long[] getLongs(int rowId, int count) {
-    assert(dictionary == null);
     long[] array = new long[count];
-    System.arraycopy(longData, rowId, array, 0, count);
+    if (dictionary == null) {
+      System.arraycopy(longData, rowId, array, 0, count);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getLong(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -441,9 +461,14 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public float[] getFloats(int rowId, int count) {
-    assert(dictionary == null);
     float[] array = new float[count];
-    System.arraycopy(floatData, rowId, array, 0, count);
+    if (dictionary == null) {
+      System.arraycopy(floatData, rowId, array, 0, count);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getFloat(rowId + i);
+      }
+    }
     return array;
   }
 
@@ -495,9 +520,14 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public double[] getDoubles(int rowId, int count) {
-    assert(dictionary == null);
     double[] array = new double[count];
-    System.arraycopy(doubleData, rowId, array, 0, count);
+    if (dictionary == null) {
+      System.arraycopy(doubleData, rowId, array, 0, count);
+    } else {
+      for (int i = 0; i < count; i++) {
+        array[i] = getDouble(rowId + i);
+      }
+    }
     return array;
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 16e0e6b4392..2e7b26126d2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -1108,6 +1108,16 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
       checkAnswer(sql("select * from tbl"), expected)
     }
   }
+
+  test("SPARK-44805: cast of struct with two arrays") {
+    withTable("tbl") {
+      sql("create table tbl (value struct<f1:array<int>,f2:array<int>>) using 
parquet")
+      sql("insert into tbl values (named_struct('f1', array(1, 2, 3), 'f2', 
array(1, 1, 2)))")
+      val df = sql("select cast(value as 
struct<f1:array<double>,f2:array<int>>) AS value from tbl")
+      val expected = Row(Row(Array(1.0d, 2.0d, 3.0d), Array(1, 1, 2))) :: Nil
+      checkAnswer(df, expected)
+    }
+  }
 }
 
 class ParquetV1QuerySuite extends ParquetQuerySuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index b2b2729e90e..42125c59bb7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.vectorized
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.plans.SQLHelper
-import org.apache.spark.sql.execution.columnar.ColumnAccessor
+import org.apache.spark.sql.execution.columnar.{ColumnAccessor, 
ColumnDictionary}
 import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -397,6 +397,84 @@ class ColumnVectorSuite extends SparkFunSuite with 
SQLHelper {
     assert(testVector.getStruct(1).get(1, DoubleType) === 5.67)
   }
 
+  testVectors("SPARK-44805: getInts with dictionary", 3, IntegerType) { 
testVector =>
+    val dict = new ColumnDictionary(Array[Int](7, 8, 9))
+    testVector.setDictionary(dict)
+    testVector.reserveDictionaryIds(3)
+    testVector.getDictionaryIds.putInt(0, 0)
+    testVector.getDictionaryIds.putInt(1, 1)
+    testVector.getDictionaryIds.putInt(2, 2)
+
+    assert(testVector.getInts(0, 3)(0) == 7)
+    assert(testVector.getInts(0, 3)(1) == 8)
+    assert(testVector.getInts(0, 3)(2) == 9)
+  }
+
+  testVectors("SPARK-44805: getShorts with dictionary", 3, ShortType) { 
testVector =>
+    val dict = new ColumnDictionary(Array[Int](7, 8, 9))
+    testVector.setDictionary(dict)
+    testVector.reserveDictionaryIds(3)
+    testVector.getDictionaryIds.putInt(0, 0)
+    testVector.getDictionaryIds.putInt(1, 1)
+    testVector.getDictionaryIds.putInt(2, 2)
+
+    assert(testVector.getShorts(0, 3)(0) == 7)
+    assert(testVector.getShorts(0, 3)(1) == 8)
+    assert(testVector.getShorts(0, 3)(2) == 9)
+  }
+
+  testVectors("SPARK-44805: getBytes with dictionary", 3, ByteType) { 
testVector =>
+    val dict = new ColumnDictionary(Array[Int](7, 8, 9))
+    testVector.setDictionary(dict)
+    testVector.reserveDictionaryIds(3)
+    testVector.getDictionaryIds.putInt(0, 0)
+    testVector.getDictionaryIds.putInt(1, 1)
+    testVector.getDictionaryIds.putInt(2, 2)
+
+    assert(testVector.getBytes(0, 3)(0) == 7)
+    assert(testVector.getBytes(0, 3)(1) == 8)
+    assert(testVector.getBytes(0, 3)(2) == 9)
+  }
+
+  testVectors("SPARK-44805: getLongs with dictionary", 3, LongType) { 
testVector =>
+    val dict = new ColumnDictionary(Array[Long](2147483648L, 2147483649L, 
2147483650L))
+    testVector.setDictionary(dict)
+    testVector.reserveDictionaryIds(3)
+    testVector.getDictionaryIds.putInt(0, 0)
+    testVector.getDictionaryIds.putInt(1, 1)
+    testVector.getDictionaryIds.putInt(2, 2)
+
+    assert(testVector.getLongs(0, 3)(0) == 2147483648L)
+    assert(testVector.getLongs(0, 3)(1) == 2147483649L)
+    assert(testVector.getLongs(0, 3)(2) == 2147483650L)
+  }
+
+  testVectors("SPARK-44805: getFloats with dictionary", 3, FloatType) { 
testVector =>
+    val dict = new ColumnDictionary(Array[Float](0.1f, 0.2f, 0.3f))
+    testVector.setDictionary(dict)
+    testVector.reserveDictionaryIds(3)
+    testVector.getDictionaryIds.putInt(0, 0)
+    testVector.getDictionaryIds.putInt(1, 1)
+    testVector.getDictionaryIds.putInt(2, 2)
+
+    assert(testVector.getFloats(0, 3)(0) == 0.1f)
+    assert(testVector.getFloats(0, 3)(1) == 0.2f)
+    assert(testVector.getFloats(0, 3)(2) == 0.3f)
+  }
+
+  testVectors("SPARK-44805: getDoubles with dictionary", 3, DoubleType) { 
testVector =>
+    val dict = new ColumnDictionary(Array[Double](1342.17727d, 1342.17728d, 
1342.17729d))
+    testVector.setDictionary(dict)
+    testVector.reserveDictionaryIds(3)
+    testVector.getDictionaryIds.putInt(0, 0)
+    testVector.getDictionaryIds.putInt(1, 1)
+    testVector.getDictionaryIds.putInt(2, 2)
+
+    assert(testVector.getDoubles(0, 3)(0) == 1342.17727d)
+    assert(testVector.getDoubles(0, 3)(1) == 1342.17728d)
+    assert(testVector.getDoubles(0, 3)(2) == 1342.17729d)
+  }
+
   test("[SPARK-22092] off-heap column vector reallocation corrupts array 
data") {
     withVector(new OffHeapColumnVector(8, arrayType)) { testVector =>
       val data = testVector.arrayData()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to