slessard commented on code in PR #10953:
URL: https://github.com/apache/iceberg/pull/10953#discussion_r1773673837


##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java:
##########
@@ -140,12 +141,18 @@ public static class ConstantVectorHolder<T> extends 
VectorHolder {
     private final int numRows;
 
     public ConstantVectorHolder(int numRows) {
+      super(new NullVector("_dummy_", numRows), null, new 
NullabilityHolder(numRows));
+      nullabilityHolder().setNulls(0, numRows);
       this.numRows = numRows;
       this.constantValue = null;
     }
 
     public ConstantVectorHolder(Types.NestedField icebergField, int numRows, T 
constantValue) {
-      super(icebergField);
+      super(
+          new NullVector(icebergField.name(), numRows),

Review Comment:
   @nastra, @amogh-jahagirdar  This change causes the Spark tests the fail. 
I've analyzed the failure and concluded that this change, as is, assumes that 
ConstantVectorHolder will only ever hold a null value. This patch fixes the 
issue, but it feels a bit tacky to me. What do you think?
   
   ```
   Subject: [PATCH] Changes
   ---
   Index: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
   --- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java     
 (revision 1a3896bba82e1e92142b237e1cd9332d67171b19)
   +++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java     
 (date 1727193972380)
   @@ -149,10 +149,12 @@
    
        public ConstantVectorHolder(Types.NestedField icebergField, int 
numRows, T constantValue) {
          super(
   -          new NullVector(icebergField.name(), numRows),
   +          (null == constantValue) ? new NullVector(icebergField.name(), 
numRows) : null,
              icebergField,
              new NullabilityHolder(numRows));
   -      nullabilityHolder().setNulls(0, numRows);
   +      if (null == constantValue) {
   +        nullabilityHolder().setNulls(0, numRows);
   +      }
          this.numRows = numRows;
          this.constantValue = constantValue;
        }
   ```



##########
arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java:
##########
@@ -262,6 +263,89 @@ public void testReadColumnFilter2() throws Exception {
         scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, 
ImmutableList.of("timestamp"));
   }
 
+  @Test
+  public void testReadColumnThatDoesNotExistInParquetSchema() throws Exception 
{
+    setMaxStackTraceElementsDisplayed(15);
+    rowsWritten = Lists.newArrayList();
+    tables = new HadoopTables();
+
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "a", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "b", Types.IntegerType.get()));
+
+    PartitionSpec spec = PartitionSpec.builderFor(schema).build();
+    Table table1 = tables.create(schema, spec, tableLocation);
+
+    // Add one record to the table
+    GenericRecord rec = GenericRecord.create(schema);
+    rec.setField("a", 1);
+    List<GenericRecord> genericRecords = Lists.newArrayList();
+    genericRecords.add(rec);
+
+    AppendFiles appendFiles = table1.newAppend();
+    appendFiles.appendFile(writeParquetFile(table1, genericRecords));
+    appendFiles.commit();
+
+    // Alter the table schema by adding a new, optional column.
+    // Do not add any data for this new column in the one existing row in the 
table
+    // and do not insert any new rows into the table.
+    Table table = tables.load(tableLocation);
+    table.updateSchema().addColumn("z", Types.IntegerType.get()).commit();
+
+    // Select all columns, all rows from the table
+    TableScan scan = table.newScan().select("*");
+
+    List<String> columns = ImmutableList.of("a", "b", "z");
+    // Read the data and verify that the returned ColumnarBatches match 
expected rows.
+    int rowIndex = 0;
+    try (VectorizedTableScanIterable itr = new 
VectorizedTableScanIterable(scan, 1, false)) {
+      for (ColumnarBatch batch : itr) {
+        List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, 
rowIndex + 1);
+
+        Map<String, Integer> columnNameToIndex = Maps.newHashMap();
+        for (int i = 0; i < columns.size(); i++) {
+          columnNameToIndex.put(columns.get(i), i);
+        }
+        Set<String> columnSet = columnNameToIndex.keySet();
+
+        assertThat(batch.numRows()).isEqualTo(1);
+        assertThat(batch.numCols()).isEqualTo(columns.size());
+
+        checkColumnarArrayValues(
+            1,
+            expectedRows,
+            batch,
+            0,
+            columnSet,
+            "a",
+            (records, i) -> records.get(i).getField("a"),
+            ColumnVector::getInt);
+        checkColumnarArrayValues(
+            1,
+            expectedRows,
+            batch,
+            1,
+            columnSet,
+            "b",
+            (records, i) -> records.get(i).getField("b"),
+            (array, i) -> array.isNullAt(i) ? null : array.getInt(i));
+        checkColumnarArrayValues(
+            1,
+            expectedRows,
+            batch,
+            2,
+            columnSet,
+            "z",
+            (records, i) -> records.get(i).getField("z"),
+            (array, i) -> array.isNullAt(i) ? null : array.getInt(i));

Review Comment:
   This has been resolved. This is the correct way to read a null value, even 
for this special case
   
   ```
   (array, i) -> array.isNullAt(i) ? null : array.getInt(i))
   ```



##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java:
##########
@@ -131,6 +131,20 @@ public boolean isDummy() {
     return vector == null;

Review Comment:
   this method is still needed.



##########
arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java:
##########
@@ -262,6 +263,89 @@ public void testReadColumnFilter2() throws Exception {
         scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, 
ImmutableList.of("timestamp"));
   }
 
+  @Test
+  public void testReadColumnThatDoesNotExistInParquetSchema() throws Exception 
{
+    setMaxStackTraceElementsDisplayed(15);
+    rowsWritten = Lists.newArrayList();
+    tables = new HadoopTables();
+
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "a", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "b", Types.IntegerType.get()));
+
+    PartitionSpec spec = PartitionSpec.builderFor(schema).build();
+    Table table1 = tables.create(schema, spec, tableLocation);
+
+    // Add one record to the table
+    GenericRecord rec = GenericRecord.create(schema);
+    rec.setField("a", 1);
+    List<GenericRecord> genericRecords = Lists.newArrayList();
+    genericRecords.add(rec);
+
+    AppendFiles appendFiles = table1.newAppend();
+    appendFiles.appendFile(writeParquetFile(table1, genericRecords));
+    appendFiles.commit();
+
+    // Alter the table schema by adding a new, optional column.
+    // Do not add any data for this new column in the one existing row in the 
table
+    // and do not insert any new rows into the table.
+    Table table = tables.load(tableLocation);
+    table.updateSchema().addColumn("z", Types.IntegerType.get()).commit();
+
+    // Select all columns, all rows from the table
+    TableScan scan = table.newScan().select("*");
+
+    List<String> columns = ImmutableList.of("a", "b", "z");
+    // Read the data and verify that the returned ColumnarBatches match 
expected rows.
+    int rowIndex = 0;
+    try (VectorizedTableScanIterable itr = new 
VectorizedTableScanIterable(scan, 1, false)) {
+      for (ColumnarBatch batch : itr) {
+        List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, 
rowIndex + 1);
+
+        Map<String, Integer> columnNameToIndex = Maps.newHashMap();
+        for (int i = 0; i < columns.size(); i++) {
+          columnNameToIndex.put(columns.get(i), i);
+        }
+        Set<String> columnSet = columnNameToIndex.keySet();
+
+        assertThat(batch.numRows()).isEqualTo(1);
+        assertThat(batch.numCols()).isEqualTo(columns.size());
+
+        checkColumnarArrayValues(
+            1,
+            expectedRows,
+            batch,
+            0,
+            columnSet,
+            "a",
+            (records, i) -> records.get(i).getField("a"),
+            ColumnVector::getInt);
+        checkColumnarArrayValues(
+            1,
+            expectedRows,
+            batch,
+            1,
+            columnSet,
+            "b",
+            (records, i) -> records.get(i).getField("b"),
+            (array, i) -> array.isNullAt(i) ? null : array.getInt(i));

Review Comment:
   This has been resolved. This is the correct way to read a null value from a 
column
   ```
   (array, i) -> array.isNullAt(i) ? null : array.getInt(i))
   ```



##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java:
##########
@@ -463,12 +460,16 @@ public static VectorizedArrowReader 
positionsWithSetArrowValidityVector() {
     return new PositionVectorReader(true);
   }
 
-  private static final class NullVectorReader extends VectorizedArrowReader {
-    private static final NullVectorReader INSTANCE = new NullVectorReader();
+  public static final class NullVectorReader extends VectorizedArrowReader {
+
+    public NullVectorReader(Types.NestedField icebergField) {
+      super(icebergField);
+    }
 
     @Override
     public VectorHolder read(VectorHolder reuse, int numValsToRead) {
-      return VectorHolder.dummyHolder(numValsToRead);
+      NullVector vector = new NullVector(icebergField().name(), numValsToRead);
+      return new VectorHolder.NullVectorHolder(vector, icebergField(), 
numValsToRead);

Review Comment:
   This has been resolved. This is the correct way to read a null value from a 
column
   ```
   (array, i) -> array.isNullAt(i) ? null : array.getInt(i))
   ```



##########
arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java:
##########
@@ -262,6 +263,89 @@ public void testReadColumnFilter2() throws Exception {
         scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, 
ImmutableList.of("timestamp"));
   }
 
+  @Test
+  public void testReadColumnThatDoesNotExistInParquetSchema() throws Exception 
{
+    setMaxStackTraceElementsDisplayed(15);
+    rowsWritten = Lists.newArrayList();
+    tables = new HadoopTables();
+
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "a", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "b", Types.IntegerType.get()));
+
+    PartitionSpec spec = PartitionSpec.builderFor(schema).build();
+    Table table1 = tables.create(schema, spec, tableLocation);
+
+    // Add one record to the table
+    GenericRecord rec = GenericRecord.create(schema);
+    rec.setField("a", 1);
+    List<GenericRecord> genericRecords = Lists.newArrayList();
+    genericRecords.add(rec);
+
+    AppendFiles appendFiles = table1.newAppend();
+    appendFiles.appendFile(writeParquetFile(table1, genericRecords));
+    appendFiles.commit();
+
+    // Alter the table schema by adding a new, optional column.
+    // Do not add any data for this new column in the one existing row in the 
table
+    // and do not insert any new rows into the table.
+    Table table = tables.load(tableLocation);
+    table.updateSchema().addColumn("z", Types.IntegerType.get()).commit();
+
+    // Select all columns, all rows from the table
+    TableScan scan = table.newScan().select("*");
+
+    List<String> columns = ImmutableList.of("a", "b", "z");
+    // Read the data and verify that the returned ColumnarBatches match 
expected rows.
+    int rowIndex = 0;
+    try (VectorizedTableScanIterable itr = new 
VectorizedTableScanIterable(scan, 1, false)) {
+      for (ColumnarBatch batch : itr) {
+        List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, 
rowIndex + 1);
+
+        Map<String, Integer> columnNameToIndex = Maps.newHashMap();
+        for (int i = 0; i < columns.size(); i++) {
+          columnNameToIndex.put(columns.get(i), i);
+        }
+        Set<String> columnSet = columnNameToIndex.keySet();
+
+        assertThat(batch.numRows()).isEqualTo(1);
+        assertThat(batch.numCols()).isEqualTo(columns.size());
+
+        checkColumnarArrayValues(
+            1,
+            expectedRows,
+            batch,
+            0,
+            columnSet,
+            "a",
+            (records, i) -> records.get(i).getField("a"),
+            ColumnVector::getInt);
+        checkColumnarArrayValues(
+            1,
+            expectedRows,
+            batch,
+            1,
+            columnSet,
+            "b",
+            (records, i) -> records.get(i).getField("b"),
+            (array, i) -> array.isNullAt(i) ? null : array.getInt(i));
+        checkColumnarArrayValues(
+            1,
+            expectedRows,
+            batch,
+            2,
+            columnSet,
+            "z",
+            (records, i) -> records.get(i).getField("z"),
+            (array, i) -> array.isNullAt(i) ? null : array.getInt(i));
+        rowIndex += 1;
+      }
+    }
+    // Read the data and verify that the returned Arrow VectorSchemaRoots 
match expected rows.
+    readAndCheckArrowResult(scan, 1, 1, columns);

Review Comment:
   this method has been inlined



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to