slessard commented on code in PR #10953: URL: https://github.com/apache/iceberg/pull/10953#discussion_r1773673837
########## arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java: ########## @@ -140,12 +141,18 @@ public static class ConstantVectorHolder<T> extends VectorHolder { private final int numRows; public ConstantVectorHolder(int numRows) { + super(new NullVector("_dummy_", numRows), null, new NullabilityHolder(numRows)); + nullabilityHolder().setNulls(0, numRows); this.numRows = numRows; this.constantValue = null; } public ConstantVectorHolder(Types.NestedField icebergField, int numRows, T constantValue) { - super(icebergField); + super( + new NullVector(icebergField.name(), numRows), Review Comment: @nastra, @amogh-jahagirdar This change causes the Spark tests the fail. I've analyzed the failure and concluded that this change, as is, assumes that ConstantVectorHolder will only ever hold a null value. This patch fixes the issue, but it feels a bit tacky to me. What do you think? ``` Subject: [PATCH] Changes --- Index: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java (revision 1a3896bba82e1e92142b237e1cd9332d67171b19) +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java (date 1727193972380) @@ -149,10 +149,12 @@ public ConstantVectorHolder(Types.NestedField icebergField, int numRows, T constantValue) { super( - new NullVector(icebergField.name(), numRows), + (null == constantValue) ? new NullVector(icebergField.name(), numRows) : null, icebergField, new NullabilityHolder(numRows)); - nullabilityHolder().setNulls(0, numRows); + if (null == constantValue) { + nullabilityHolder().setNulls(0, numRows); + } this.numRows = numRows; this.constantValue = constantValue; } ``` ########## arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java: ########## @@ -262,6 +263,89 @@ public void testReadColumnFilter2() throws Exception { scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ImmutableList.of("timestamp")); } + @Test + public void testReadColumnThatDoesNotExistInParquetSchema() throws Exception { + setMaxStackTraceElementsDisplayed(15); + rowsWritten = Lists.newArrayList(); + tables = new HadoopTables(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "a", Types.IntegerType.get()), + Types.NestedField.optional(2, "b", Types.IntegerType.get())); + + PartitionSpec spec = PartitionSpec.builderFor(schema).build(); + Table table1 = tables.create(schema, spec, tableLocation); + + // Add one record to the table + GenericRecord rec = GenericRecord.create(schema); + rec.setField("a", 1); + List<GenericRecord> genericRecords = Lists.newArrayList(); + genericRecords.add(rec); + + AppendFiles appendFiles = table1.newAppend(); + appendFiles.appendFile(writeParquetFile(table1, genericRecords)); + appendFiles.commit(); + + // Alter the table schema by adding a new, optional column. + // Do not add any data for this new column in the one existing row in the table + // and do not insert any new rows into the table. + Table table = tables.load(tableLocation); + table.updateSchema().addColumn("z", Types.IntegerType.get()).commit(); + + // Select all columns, all rows from the table + TableScan scan = table.newScan().select("*"); + + List<String> columns = ImmutableList.of("a", "b", "z"); + // Read the data and verify that the returned ColumnarBatches match expected rows. + int rowIndex = 0; + try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, 1, false)) { + for (ColumnarBatch batch : itr) { + List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + 1); + + Map<String, Integer> columnNameToIndex = Maps.newHashMap(); + for (int i = 0; i < columns.size(); i++) { + columnNameToIndex.put(columns.get(i), i); + } + Set<String> columnSet = columnNameToIndex.keySet(); + + assertThat(batch.numRows()).isEqualTo(1); + assertThat(batch.numCols()).isEqualTo(columns.size()); + + checkColumnarArrayValues( + 1, + expectedRows, + batch, + 0, + columnSet, + "a", + (records, i) -> records.get(i).getField("a"), + ColumnVector::getInt); + checkColumnarArrayValues( + 1, + expectedRows, + batch, + 1, + columnSet, + "b", + (records, i) -> records.get(i).getField("b"), + (array, i) -> array.isNullAt(i) ? null : array.getInt(i)); + checkColumnarArrayValues( + 1, + expectedRows, + batch, + 2, + columnSet, + "z", + (records, i) -> records.get(i).getField("z"), + (array, i) -> array.isNullAt(i) ? null : array.getInt(i)); Review Comment: This has been resolved. This is the correct way to read a null value, even for this special case ``` (array, i) -> array.isNullAt(i) ? null : array.getInt(i)) ``` ########## arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java: ########## @@ -131,6 +131,20 @@ public boolean isDummy() { return vector == null; Review Comment: this method is still needed. ########## arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java: ########## @@ -262,6 +263,89 @@ public void testReadColumnFilter2() throws Exception { scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ImmutableList.of("timestamp")); } + @Test + public void testReadColumnThatDoesNotExistInParquetSchema() throws Exception { + setMaxStackTraceElementsDisplayed(15); + rowsWritten = Lists.newArrayList(); + tables = new HadoopTables(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "a", Types.IntegerType.get()), + Types.NestedField.optional(2, "b", Types.IntegerType.get())); + + PartitionSpec spec = PartitionSpec.builderFor(schema).build(); + Table table1 = tables.create(schema, spec, tableLocation); + + // Add one record to the table + GenericRecord rec = GenericRecord.create(schema); + rec.setField("a", 1); + List<GenericRecord> genericRecords = Lists.newArrayList(); + genericRecords.add(rec); + + AppendFiles appendFiles = table1.newAppend(); + appendFiles.appendFile(writeParquetFile(table1, genericRecords)); + appendFiles.commit(); + + // Alter the table schema by adding a new, optional column. + // Do not add any data for this new column in the one existing row in the table + // and do not insert any new rows into the table. + Table table = tables.load(tableLocation); + table.updateSchema().addColumn("z", Types.IntegerType.get()).commit(); + + // Select all columns, all rows from the table + TableScan scan = table.newScan().select("*"); + + List<String> columns = ImmutableList.of("a", "b", "z"); + // Read the data and verify that the returned ColumnarBatches match expected rows. + int rowIndex = 0; + try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, 1, false)) { + for (ColumnarBatch batch : itr) { + List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + 1); + + Map<String, Integer> columnNameToIndex = Maps.newHashMap(); + for (int i = 0; i < columns.size(); i++) { + columnNameToIndex.put(columns.get(i), i); + } + Set<String> columnSet = columnNameToIndex.keySet(); + + assertThat(batch.numRows()).isEqualTo(1); + assertThat(batch.numCols()).isEqualTo(columns.size()); + + checkColumnarArrayValues( + 1, + expectedRows, + batch, + 0, + columnSet, + "a", + (records, i) -> records.get(i).getField("a"), + ColumnVector::getInt); + checkColumnarArrayValues( + 1, + expectedRows, + batch, + 1, + columnSet, + "b", + (records, i) -> records.get(i).getField("b"), + (array, i) -> array.isNullAt(i) ? null : array.getInt(i)); Review Comment: This has been resolved. This is the correct way to read a null value from a column ``` (array, i) -> array.isNullAt(i) ? null : array.getInt(i)) ``` ########## arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java: ########## @@ -463,12 +460,16 @@ public static VectorizedArrowReader positionsWithSetArrowValidityVector() { return new PositionVectorReader(true); } - private static final class NullVectorReader extends VectorizedArrowReader { - private static final NullVectorReader INSTANCE = new NullVectorReader(); + public static final class NullVectorReader extends VectorizedArrowReader { + + public NullVectorReader(Types.NestedField icebergField) { + super(icebergField); + } @Override public VectorHolder read(VectorHolder reuse, int numValsToRead) { - return VectorHolder.dummyHolder(numValsToRead); + NullVector vector = new NullVector(icebergField().name(), numValsToRead); + return new VectorHolder.NullVectorHolder(vector, icebergField(), numValsToRead); Review Comment: This has been resolved. This is the correct way to read a null value from a column ``` (array, i) -> array.isNullAt(i) ? null : array.getInt(i)) ``` ########## arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java: ########## @@ -262,6 +263,89 @@ public void testReadColumnFilter2() throws Exception { scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH, ImmutableList.of("timestamp")); } + @Test + public void testReadColumnThatDoesNotExistInParquetSchema() throws Exception { + setMaxStackTraceElementsDisplayed(15); + rowsWritten = Lists.newArrayList(); + tables = new HadoopTables(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "a", Types.IntegerType.get()), + Types.NestedField.optional(2, "b", Types.IntegerType.get())); + + PartitionSpec spec = PartitionSpec.builderFor(schema).build(); + Table table1 = tables.create(schema, spec, tableLocation); + + // Add one record to the table + GenericRecord rec = GenericRecord.create(schema); + rec.setField("a", 1); + List<GenericRecord> genericRecords = Lists.newArrayList(); + genericRecords.add(rec); + + AppendFiles appendFiles = table1.newAppend(); + appendFiles.appendFile(writeParquetFile(table1, genericRecords)); + appendFiles.commit(); + + // Alter the table schema by adding a new, optional column. + // Do not add any data for this new column in the one existing row in the table + // and do not insert any new rows into the table. + Table table = tables.load(tableLocation); + table.updateSchema().addColumn("z", Types.IntegerType.get()).commit(); + + // Select all columns, all rows from the table + TableScan scan = table.newScan().select("*"); + + List<String> columns = ImmutableList.of("a", "b", "z"); + // Read the data and verify that the returned ColumnarBatches match expected rows. + int rowIndex = 0; + try (VectorizedTableScanIterable itr = new VectorizedTableScanIterable(scan, 1, false)) { + for (ColumnarBatch batch : itr) { + List<GenericRecord> expectedRows = rowsWritten.subList(rowIndex, rowIndex + 1); + + Map<String, Integer> columnNameToIndex = Maps.newHashMap(); + for (int i = 0; i < columns.size(); i++) { + columnNameToIndex.put(columns.get(i), i); + } + Set<String> columnSet = columnNameToIndex.keySet(); + + assertThat(batch.numRows()).isEqualTo(1); + assertThat(batch.numCols()).isEqualTo(columns.size()); + + checkColumnarArrayValues( + 1, + expectedRows, + batch, + 0, + columnSet, + "a", + (records, i) -> records.get(i).getField("a"), + ColumnVector::getInt); + checkColumnarArrayValues( + 1, + expectedRows, + batch, + 1, + columnSet, + "b", + (records, i) -> records.get(i).getField("b"), + (array, i) -> array.isNullAt(i) ? null : array.getInt(i)); + checkColumnarArrayValues( + 1, + expectedRows, + batch, + 2, + columnSet, + "z", + (records, i) -> records.get(i).getField("z"), + (array, i) -> array.isNullAt(i) ? null : array.getInt(i)); + rowIndex += 1; + } + } + // Read the data and verify that the returned Arrow VectorSchemaRoots match expected rows. + readAndCheckArrowResult(scan, 1, 1, columns); Review Comment: this method has been inlined -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org