flyrain commented on code in PR #11390: URL: https://github.com/apache/iceberg/pull/11390#discussion_r1826604505
########## data/src/main/java/org/apache/iceberg/data/DeleteFilter.java: ########## @@ -69,7 +69,8 @@ protected DeleteFilter( List<DeleteFile> deletes, Schema tableSchema, Schema requestedSchema, - DeleteCounter counter) { + DeleteCounter counter, + boolean isBatchReading) { Review Comment: Does batch reading necessarily mean no `_pos`? I believe not, as you can explicitly project it, like `select _pos from t`. We should give an accurate name, something like `needRowPosition` or `needRowPosCol`. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java: ########## @@ -97,6 +98,7 @@ private CloseableIterable<ColumnarBatch> newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .hasPositionDelete(hasPositionDelete) Review Comment: The key here is determining whether we want to compute row offsets specifically for the filtered row groups. This decision doesn’t have to be directly tied to the presence of position deletes. Perhaps a name like `needRowGroupOffset` would better capture this intent and improve clarity. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java: ########## @@ -93,10 +93,10 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) { InputFile inputFile = getInputFile(filePath); Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask"); - SparkDeleteFilter deleteFilter = - task.deletes().isEmpty() - ? null - : new SparkDeleteFilter(filePath, task.deletes(), counter()); + SparkDeleteFilter sparkDeleteFilter = + new SparkDeleteFilter(filePath, task.deletes(), counter(), true); + + SparkDeleteFilter deleteFilter = task.deletes().isEmpty() ? null : sparkDeleteFilter; Review Comment: We don't need delete filter if `task.deletes().isEmpty()`, but the new code always creates a filter object. ``` filter = null; if(!task.deletes().isEmpty()) { filter = new filter() } ``` ########## data/src/main/java/org/apache/iceberg/data/DeleteFilter.java: ########## @@ -69,7 +69,8 @@ protected DeleteFilter( List<DeleteFile> deletes, Schema tableSchema, Schema requestedSchema, - DeleteCounter counter) { + DeleteCounter counter, + boolean isBatchReading) { Review Comment: Think a bit more, I'm considering if only the non-vector readers actually need an implicit `_pos` column. If that’s the case, would it make more sense to adjust this within RowReader by adding `_pos` there? This approach could simplify things by eliminating the need to check whether a reader is vectorized, especially since vectorization isn’t necessarily strongly correlated with the requirement for `_pos`. Here is pseudo code to add it in class `RowDataReader` ``` LOG.debug("Opening data file {}", filePath); expectedSchema().add(`_pos`); <--- add it here SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes(), counter(), false); ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java: ########## @@ -97,6 +98,7 @@ private CloseableIterable<ColumnarBatch> newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .hasPositionDelete(hasPositionDelete) Review Comment: Wondering if `withPositionDelete` is more suitable ########## data/src/main/java/org/apache/iceberg/data/DeleteFilter.java: ########## @@ -93,7 +94,8 @@ protected DeleteFilter( this.posDeletes = posDeleteBuilder.build(); this.eqDeletes = eqDeleteBuilder.build(); - this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes); + this.requiredSchema = + fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes, isBatchReading); Review Comment: One question I asked myself is whether it impacts the metadata column read? It seems not, but the method `DeleteFilter::fileProjection` seems a bit hard to read, we can refactor it later. It makes more sense to make it a static until method instead of instance method. It's a bit weird to pass the schema to delete filter, then get it back from the filter. This seems something we can improve on it as a follow up. ########## parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java: ########## @@ -181,8 +184,8 @@ boolean[] shouldSkip() { return shouldSkip; } - private Map<Long, Long> generateOffsetToStartPos(Schema schema) { - if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) { + private Map<Long, Long> generateOffsetToStartPos() { + if (hasPositionDelete) { Review Comment: I'd recommend to do it in the caller like this ``` Map<Long, Long> offsetToStartPos = hasPositionDelete? generateOffsetToStartPos(): null; ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java: ########## @@ -98,6 +98,7 @@ private CloseableIterable<InternalRow> newParquetIterable( .filter(residual) .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) + .hasPositionDelete(readSchema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) Review Comment: Do we need it for row reader? Can we use a default value here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org