flyrain commented on code in PR #11551:
URL: https://github.com/apache/iceberg/pull/11551#discussion_r1863894343
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -245,5 +259,16 @@ void applyEqDelete(ColumnarBatch columnarBatch) {
columnarBatch.setNumRows(currentRowId);
}
+
+ ColumnarBatch removeExtraColumnsFromColumnarBatch(
Review Comment:
Minor suggestion: Simplify to `removeExtraColumns`?
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java:
##########
@@ -45,11 +45,23 @@ public class ColumnarBatchReader extends
BaseBatchReader<ColumnarBatch> {
private final boolean hasIsDeletedColumn;
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;
+ // In the case of Equality Delete, we have also built ColumnarBatchReader
for the equality delete
+ // filter columns to read the value to find out which rows are deleted. If
these deleted filter
+ // columns are not in the requested schema, then these are the extra columns
that we want to
+ // remove before return the ColumnBatch to Spark.
+ // Supposed table schema is C1, C2, C3, C4, C5, The query is:
+ // SELECT C5 FROM table, and the equality delete Filter is on C3, C4,
+ // We read the values of C3, C4 to figure out which rows are deleted, but we
don't want to include
+ // these values in the ColumnBatch that we return to Spark. In this example,
the numOfExtraColumns
+ // is 2. Since when creating the DeleteFilter, we append these extra columns
in the end of the
+ // requested schema, we can just remove them from the end of the
ColumnVector.
+ private int numOfExtraColumns = 0;
- public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
+ public ColumnarBatchReader(List<VectorizedReader<?>> readers, int
numExtraCol) {
Review Comment:
Can we use the DeleteFilter(field `deletes`) within the class, so that no
extra parameter is required? We could move the method `numOfExtraColumns` to
this class in that case.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java:
##########
@@ -125,4 +126,25 @@ protected VectorizedReader<?>
vectorizedReader(List<VectorizedReader<?>> reorder
return reader;
}
}
+
+ private static int numOfExtraColumns(DeleteFilter deleteFilter) {
Review Comment:
minor: `DeleteFilter` -> `dDeleteFilter<InternalRow>`
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -73,6 +74,7 @@ protected DeleteFilter(
boolean needRowPosCol) {
this.filePath = filePath;
this.counter = counter;
+ this.requestedSchema = requestedSchema;
Review Comment:
Can we be consistent with the name? `requestedSchema` or `expectedSchema`? I
guess `expectedSchema` is more commonly used.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java:
##########
@@ -125,4 +126,25 @@ protected VectorizedReader<?>
vectorizedReader(List<VectorizedReader<?>> reorder
return reader;
}
}
+
+ private static int numOfExtraColumns(DeleteFilter deleteFilter) {
+ if (deleteFilter != null) {
+ if (deleteFilter.hasEqDeletes()) {
+ // For Equality Delete, the requiredColumns and expectedColumns may
not be the
+ // same. For example, supposed table schema is C1, C2, C3, C4, C5, The
query is:
+ // SELECT C5 FROM table, and the equality delete Filter is on C3, C4,
then
+ // the requestedSchema is C5, and the required schema is C5, C3 and
C4. The
+ // vectorized reader reads also need to read C3 and C4 columns to
figure out
+ // which rows are deleted. However, after figuring out the deleted
rows, the
+ // extra columns values are not needed to returned to Spark.
+ // Getting the numOfExtraColumns so we can remove these extra columns
+ // from ColumnBatch later.
Review Comment:
Can we move the comment to method's Java doc?
##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -622,6 +624,41 @@ public void
testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOExceptio
assertThat(rowSet(tblName, tbl, "*")).hasSize(193);
}
+ @TestTemplate
+ public void testEqualityDeleteWithDifferentScanAndDeleteColumns() throws
IOException {
Review Comment:
Is it possible to check the intermediate results? for example, checking the
`ColumnarBatch` returned to Spark. We may avoid using `comet` as a dependency
for the test.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java:
##########
@@ -125,4 +126,25 @@ protected VectorizedReader<?>
vectorizedReader(List<VectorizedReader<?>> reorder
return reader;
}
}
+
+ private static int numOfExtraColumns(DeleteFilter deleteFilter) {
+ if (deleteFilter != null) {
+ if (deleteFilter.hasEqDeletes()) {
+ // For Equality Delete, the requiredColumns and expectedColumns may
not be the
+ // same. For example, supposed table schema is C1, C2, C3, C4, C5, The
query is:
+ // SELECT C5 FROM table, and the equality delete Filter is on C3, C4,
then
+ // the requestedSchema is C5, and the required schema is C5, C3 and
C4. The
+ // vectorized reader reads also need to read C3 and C4 columns to
figure out
+ // which rows are deleted. However, after figuring out the deleted
rows, the
+ // extra columns values are not needed to returned to Spark.
+ // Getting the numOfExtraColumns so we can remove these extra columns
+ // from ColumnBatch later.
+ List<Types.NestedField> requiredColumns =
deleteFilter.requiredSchema().columns();
+ List<Types.NestedField> expectedColumns =
deleteFilter.requestedSchema().columns();
+ return requiredColumns.size() - expectedColumns.size();
Review Comment:
Are we sure the extra columns are consistently appended to the end of the
array?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]