aokolnychyi commented on code in PR #11657: URL: https://github.com/apache/iceberg/pull/11657#discussion_r1872359214
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java: ########## @@ -90,6 +91,10 @@ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) { ExpressionUtil.extractByIdInclusive( task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds)); + if (ContentFileUtil.isDV(task.file())) { Review Comment: Do we need `Iterable` or can we simply implement `Iterator` directly? It seems we create an iterable for the sake of getting an iterator for it. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java: ########## @@ -90,6 +91,10 @@ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) { ExpressionUtil.extractByIdInclusive( task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds)); + if (ContentFileUtil.isDV(task.file())) { + return new DVIterable(inputFile, task.file(), task.spec(), expectedSchema()).iterator(); Review Comment: What about constants? What would happen if we project `_partition` or `_spec_id` metadata columns? Is it something that we support for the `position_deletes` table right now? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.puffin.BlobMetadata; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinReader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.iceberg.util.StructLikeUtil; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; + +class DVIterable extends CloseableGroup implements CloseableIterable<InternalRow> { + private final Puffin.ReadBuilder builder; + private final PartitionSpec spec; + private final DeleteFile deleteFile; + private final Schema projection; + + DVIterable(InputFile inputFile, DeleteFile deleteFile, PartitionSpec spec, Schema projection) { + this.deleteFile = deleteFile; + this.builder = Puffin.read(inputFile); + this.spec = spec; + this.projection = projection; + } + + @Override + public CloseableIterator<InternalRow> iterator() { + PuffinReader reader = builder.build(); + addCloseable(reader); + return new DVIterator(reader); + } + + private class DVIterator implements CloseableIterator<InternalRow> { + private final PuffinReader reader; + private Iterator<Long> positions = Collections.emptyIterator(); + private Integer deletedPositionIndex; + private GenericInternalRow row; + + DVIterator(PuffinReader reader) { + this.reader = reader; + try { + reader.fileMetadata().blobs().stream() Review Comment: I don't think we should use the Puffin reader in this case. Can we use `DeleteLoader` instead? We need to do a range read directly as `DeleteFile` has all information like offset and length. We should have all needed logic already in `BaseDeleteLoader`. Reading the footer of the Puffin file is unnecessary. ########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java: ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseScanTaskGroup; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.TestBase; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.unsafe.types.UTF8String; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestPositionDeletesReader extends TestBase { Review Comment: Can we also test reading DVs end-to-end by quering the `position_deletes` metadata table in Spark? I think we can commit DVs from Java and then read it from Spark, as we don't have DVs in Spark right now? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterable.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.puffin.BlobMetadata; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinReader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.unsafe.types.UTF8String; + +class DVIterable extends CloseableGroup implements CloseableIterable<InternalRow> { + private final Puffin.ReadBuilder builder; + private final PartitionSpec spec; + private final DeleteFile deleteFile; + private final Schema projection; + + DVIterable(InputFile inputFile, DeleteFile deleteFile, PartitionSpec spec, Schema projection) { + this.deleteFile = deleteFile; + this.builder = Puffin.read(inputFile); + this.spec = spec; + this.projection = projection; + } + + @Override + public CloseableIterator<InternalRow> iterator() { + PuffinReader reader = builder.build(); Review Comment: I'd argue we rarely need to read the entire DV file as not all DVs may be still valid. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java: ########## @@ -90,6 +91,10 @@ protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) { ExpressionUtil.extractByIdInclusive( task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds)); + if (ContentFileUtil.isDV(task.file())) { + return new DVIterable(inputFile, task.file(), task.spec(), expectedSchema()).iterator(); Review Comment: Okay, I see that we populate those constants again in `DVIterable`. Can we pass `idToConstant` there instead? If I understand correctly, we should have all constant values computed by the time we create `DVIterable`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org