flyrain commented on code in PR #11551: URL: https://github.com/apache/iceberg/pull/11551#discussion_r1863894343
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java: ########## @@ -245,5 +259,16 @@ void applyEqDelete(ColumnarBatch columnarBatch) { columnarBatch.setNumRows(currentRowId); } + + ColumnarBatch removeExtraColumnsFromColumnarBatch( Review Comment: Minor suggestion: Simplify to `removeExtraColumns`? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java: ########## @@ -45,11 +45,23 @@ public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> { private final boolean hasIsDeletedColumn; private DeleteFilter<InternalRow> deletes = null; private long rowStartPosInBatch = 0; + // In the case of Equality Delete, we have also built ColumnarBatchReader for the equality delete + // filter columns to read the value to find out which rows are deleted. If these deleted filter + // columns are not in the requested schema, then these are the extra columns that we want to + // remove before return the ColumnBatch to Spark. + // Supposed table schema is C1, C2, C3, C4, C5, The query is: + // SELECT C5 FROM table, and the equality delete Filter is on C3, C4, + // We read the values of C3, C4 to figure out which rows are deleted, but we don't want to include + // these values in the ColumnBatch that we return to Spark. In this example, the numOfExtraColumns + // is 2. Since when creating the DeleteFilter, we append these extra columns in the end of the + // requested schema, we can just remove them from the end of the ColumnVector. + private int numOfExtraColumns = 0; - public ColumnarBatchReader(List<VectorizedReader<?>> readers) { + public ColumnarBatchReader(List<VectorizedReader<?>> readers, int numExtraCol) { Review Comment: Can we use the DeleteFilter(field `deletes`) within the class, so that no extra parameter is required? We could move the method `numOfExtraColumns` to this class in that case. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java: ########## @@ -125,4 +126,25 @@ protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> reorder return reader; } } + + private static int numOfExtraColumns(DeleteFilter deleteFilter) { Review Comment: minor: `DeleteFilter` -> `dDeleteFilter<InternalRow>` ########## data/src/main/java/org/apache/iceberg/data/DeleteFilter.java: ########## @@ -73,6 +74,7 @@ protected DeleteFilter( boolean needRowPosCol) { this.filePath = filePath; this.counter = counter; + this.requestedSchema = requestedSchema; Review Comment: Can we be consistent with the name? `requestedSchema` or `expectedSchema`? I guess `expectedSchema` is more commonly used. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java: ########## @@ -125,4 +126,25 @@ protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> reorder return reader; } } + + private static int numOfExtraColumns(DeleteFilter deleteFilter) { + if (deleteFilter != null) { + if (deleteFilter.hasEqDeletes()) { + // For Equality Delete, the requiredColumns and expectedColumns may not be the + // same. For example, supposed table schema is C1, C2, C3, C4, C5, The query is: + // SELECT C5 FROM table, and the equality delete Filter is on C3, C4, then + // the requestedSchema is C5, and the required schema is C5, C3 and C4. The + // vectorized reader reads also need to read C3 and C4 columns to figure out + // which rows are deleted. However, after figuring out the deleted rows, the + // extra columns values are not needed to returned to Spark. + // Getting the numOfExtraColumns so we can remove these extra columns + // from ColumnBatch later. Review Comment: Can we move the comment to method's Java doc? ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java: ########## @@ -622,6 +624,41 @@ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio assertThat(rowSet(tblName, tbl, "*")).hasSize(193); } + @TestTemplate + public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws IOException { Review Comment: Is it possible to check the intermediate results? for example, checking the `ColumnarBatch` returned to Spark. We may avoid using `comet` as a dependency for the test. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java: ########## @@ -125,4 +126,25 @@ protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> reorder return reader; } } + + private static int numOfExtraColumns(DeleteFilter deleteFilter) { + if (deleteFilter != null) { + if (deleteFilter.hasEqDeletes()) { + // For Equality Delete, the requiredColumns and expectedColumns may not be the + // same. For example, supposed table schema is C1, C2, C3, C4, C5, The query is: + // SELECT C5 FROM table, and the equality delete Filter is on C3, C4, then + // the requestedSchema is C5, and the required schema is C5, C3 and C4. The + // vectorized reader reads also need to read C3 and C4 columns to figure out + // which rows are deleted. However, after figuring out the deleted rows, the + // extra columns values are not needed to returned to Spark. + // Getting the numOfExtraColumns so we can remove these extra columns + // from ColumnBatch later. + List<Types.NestedField> requiredColumns = deleteFilter.requiredSchema().columns(); + List<Types.NestedField> expectedColumns = deleteFilter.requestedSchema().columns(); + return requiredColumns.size() - expectedColumns.size(); Review Comment: Are we sure the extra columns are consistently appended to the end of the array? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org