flyrain commented on code in PR #11390:
URL: https://github.com/apache/iceberg/pull/11390#discussion_r1826604505
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -69,7 +69,8 @@ protected DeleteFilter(
List<DeleteFile> deletes,
Schema tableSchema,
Schema requestedSchema,
- DeleteCounter counter) {
+ DeleteCounter counter,
+ boolean isBatchReading) {
Review Comment:
Does batch reading necessarily mean no `_pos`? I believe not, as you can
explicitly project it, like `select _pos from t`. We should give an accurate
name, something like `needRowPosition` or `needRowPosCol`.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -97,6 +98,7 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
// read performance as every batch read doesn't have to pay the cost
of allocating memory.
.reuseContainers()
.withNameMapping(nameMapping())
+ .hasPositionDelete(hasPositionDelete)
Review Comment:
The key here is determining whether we want to compute row offsets
specifically for the filtered row groups. This decision doesn’t have to be
directly tied to the presence of position deletes. Perhaps a name like
`needRowGroupOffset` would better capture this intent and improve clarity.
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java:
##########
@@ -93,10 +93,10 @@ protected CloseableIterator<ColumnarBatch>
open(FileScanTask task) {
InputFile inputFile = getInputFile(filePath);
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated
with FileScanTask");
- SparkDeleteFilter deleteFilter =
- task.deletes().isEmpty()
- ? null
- : new SparkDeleteFilter(filePath, task.deletes(), counter());
+ SparkDeleteFilter sparkDeleteFilter =
+ new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
+
+ SparkDeleteFilter deleteFilter = task.deletes().isEmpty() ? null :
sparkDeleteFilter;
Review Comment:
We don't need delete filter if `task.deletes().isEmpty()`, but the new code
always creates a filter object.
```
filter = null;
if(!task.deletes().isEmpty()) {
filter = new filter()
}
```
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -69,7 +69,8 @@ protected DeleteFilter(
List<DeleteFile> deletes,
Schema tableSchema,
Schema requestedSchema,
- DeleteCounter counter) {
+ DeleteCounter counter,
+ boolean isBatchReading) {
Review Comment:
Think a bit more, I'm considering if only the non-vector readers actually
need an implicit `_pos` column. If that’s the case, would it make more sense to
adjust this within RowReader by adding `_pos` there? This approach could
simplify things by eliminating the need to check whether a reader is
vectorized, especially since vectorization isn’t necessarily strongly
correlated with the requirement for `_pos`.
Here is pseudo code to add it in class `RowDataReader`
```
LOG.debug("Opening data file {}", filePath);
expectedSchema().add(`_pos`); <--- add it here
SparkDeleteFilter deleteFilter =
new SparkDeleteFilter(filePath, task.deletes(), counter(), false);
```
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java:
##########
@@ -97,6 +98,7 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
// read performance as every batch read doesn't have to pay the cost
of allocating memory.
.reuseContainers()
.withNameMapping(nameMapping())
+ .hasPositionDelete(hasPositionDelete)
Review Comment:
Wondering if `withPositionDelete` is more suitable
##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -93,7 +94,8 @@ protected DeleteFilter(
this.posDeletes = posDeleteBuilder.build();
this.eqDeletes = eqDeleteBuilder.build();
- this.requiredSchema = fileProjection(tableSchema, requestedSchema,
posDeletes, eqDeletes);
+ this.requiredSchema =
+ fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes,
isBatchReading);
Review Comment:
One question I asked myself is whether it impacts the metadata column read?
It seems not, but the method `DeleteFilter::fileProjection` seems a bit hard to
read, we can refactor it later. It makes more sense to make it a static until
method instead of instance method. It's a bit weird to pass the schema to
delete filter, then get it back from the filter. This seems something we can
improve on it as a follow up.
##########
parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java:
##########
@@ -181,8 +184,8 @@ boolean[] shouldSkip() {
return shouldSkip;
}
- private Map<Long, Long> generateOffsetToStartPos(Schema schema) {
- if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) {
+ private Map<Long, Long> generateOffsetToStartPos() {
+ if (hasPositionDelete) {
Review Comment:
I'd recommend to do it in the caller like this
```
Map<Long, Long> offsetToStartPos = hasPositionDelete?
generateOffsetToStartPos(): null;
```
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java:
##########
@@ -98,6 +98,7 @@ private CloseableIterable<InternalRow> newParquetIterable(
.filter(residual)
.caseSensitive(caseSensitive())
.withNameMapping(nameMapping())
+
.hasPositionDelete(readSchema.findField(MetadataColumns.ROW_POSITION.fieldId())
== null)
Review Comment:
Do we need it for row reader? Can we use a default value here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]