aokolnychyi commented on code in PR #6716: URL: https://github.com/apache/iceberg/pull/6716#discussion_r1101938158
########## core/src/main/java/org/apache/iceberg/PositionDeletesTable.java: ########## @@ -43,15 +43,21 @@ public class PositionDeletesTable extends BaseMetadataTable { private final Schema schema; + private final int defaultSpecId; + private final Map<Integer, PartitionSpec> specs; PositionDeletesTable(Table table) { Review Comment: The constructors seem repetitive. Can we just call the other one here? ``` this(table, table.name() + ".position_deletes"); ``` ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask> Review Comment: nit: `PositionDeleteRowReader` -> `PositionDeletesRowReader` (plural)? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask> + implements PartitionReader<InternalRow> { + private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class); Review Comment: nit: What about an empty line before this as the class definition is split into multiple lines? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask> + implements PartitionReader<InternalRow> { + private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class); + + PositionDeleteRowReader(SparkInputPartition partition) { + this( + partition.table(), + partition.taskGroup(), + partition.expectedSchema(), + partition.isCaseSensitive()); + } + + PositionDeleteRowReader( + Table table, + ScanTaskGroup<PositionDeletesScanTask> taskGroup, + Schema expectedSchema, + boolean caseSensitive) { + + super(table, taskGroup, expectedSchema, caseSensitive); + + int numSplits = taskGroup.tasks().size(); + LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name()); + } + + @Override + protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) { + return Stream.of(task.file()); + } + + @Override + protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) { + Preconditions.checkArgument( Review Comment: Do we really need this check? Isn't it sufficient to know we got `PositionDeletesScanTask`? I doubt it will be false at any point. Even if someone passes a custom table implementation, the code should work as we just need to follow the contract of the task and table. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask> + implements PartitionReader<InternalRow> { + private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class); + + PositionDeleteRowReader(SparkInputPartition partition) { + this( + partition.table(), + partition.taskGroup(), + partition.expectedSchema(), + partition.isCaseSensitive()); + } + + PositionDeleteRowReader( + Table table, + ScanTaskGroup<PositionDeletesScanTask> taskGroup, + Schema expectedSchema, + boolean caseSensitive) { + + super(table, taskGroup, expectedSchema, caseSensitive); + + int numSplits = taskGroup.tasks().size(); + LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name()); + } + + @Override + protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) { + return Stream.of(task.file()); + } + + @Override + protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) { + Preconditions.checkArgument( + table() instanceof PositionDeletesTable + || (table() instanceof SerializableTable.SerializableMetadataTable + && ((SerializableTable.SerializableMetadataTable) table()) + .type() + .equals(MetadataTableType.POSITION_DELETES)), + "PositionDeleteRowReader is only supported for PositionDeletesTable"); + + Types.StructType partitionType = Partitioning.partitionType(table()); + Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType); + String filePath = task.file().path().toString(); + LOG.debug("Opening position delete file {}", filePath); + + // update the current file for Spark's filename() function + InputFileBlockHolder.set(filePath, task.start(), task.length()); + + InputFile inputFile = getInputFile(task.file().path().toString()); + Preconditions.checkNotNull( + inputFile, "Could not find InputFile associated with PositionDeleteScanTask"); + return newIterable( + inputFile, + task.file().format(), + task.start(), + task.length(), + task.residual(), + expectedSchema(), + idToConstant) + .iterator(); + } + + protected Map<Integer, ?> constantsMap( Review Comment: Can't we use `constantsMap()` from `BaseReader` now? ########## core/src/main/java/org/apache/iceberg/SerializableTable.java: ########## @@ -116,7 +116,7 @@ private FileIO fileIO(Table table) { return table.io(); } - private Table lazyTable() { + protected Table lazyTable() { Review Comment: Is this still needed? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask> + implements PartitionReader<InternalRow> { + private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class); + + PositionDeleteRowReader(SparkInputPartition partition) { + this( + partition.table(), + partition.taskGroup(), + partition.expectedSchema(), + partition.isCaseSensitive()); + } + + PositionDeleteRowReader( + Table table, + ScanTaskGroup<PositionDeletesScanTask> taskGroup, + Schema expectedSchema, + boolean caseSensitive) { + + super(table, taskGroup, expectedSchema, caseSensitive); + + int numSplits = taskGroup.tasks().size(); + LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name()); + } + + @Override + protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) { + return Stream.of(task.file()); + } + + @Override + protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) { + Preconditions.checkArgument( + table() instanceof PositionDeletesTable + || (table() instanceof SerializableTable.SerializableMetadataTable + && ((SerializableTable.SerializableMetadataTable) table()) + .type() + .equals(MetadataTableType.POSITION_DELETES)), + "PositionDeleteRowReader is only supported for PositionDeletesTable"); + + Types.StructType partitionType = Partitioning.partitionType(table()); + Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType); + String filePath = task.file().path().toString(); + LOG.debug("Opening position delete file {}", filePath); + + // update the current file for Spark's filename() function + InputFileBlockHolder.set(filePath, task.start(), task.length()); Review Comment: Do we have a test that verifies that `input_file_name()` actually works? ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask> + implements PartitionReader<InternalRow> { + private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class); + + PositionDeleteRowReader(SparkInputPartition partition) { + this( + partition.table(), + partition.taskGroup(), + partition.expectedSchema(), + partition.isCaseSensitive()); + } + + PositionDeleteRowReader( + Table table, + ScanTaskGroup<PositionDeletesScanTask> taskGroup, + Schema expectedSchema, + boolean caseSensitive) { + + super(table, taskGroup, expectedSchema, caseSensitive); + + int numSplits = taskGroup.tasks().size(); + LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name()); + } + + @Override + protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) { + return Stream.of(task.file()); + } + + @Override + protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) { + Preconditions.checkArgument( + table() instanceof PositionDeletesTable + || (table() instanceof SerializableTable.SerializableMetadataTable + && ((SerializableTable.SerializableMetadataTable) table()) + .type() + .equals(MetadataTableType.POSITION_DELETES)), + "PositionDeleteRowReader is only supported for PositionDeletesTable"); + + Types.StructType partitionType = Partitioning.partitionType(table()); + Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType); + String filePath = task.file().path().toString(); + LOG.debug("Opening position delete file {}", filePath); + + // update the current file for Spark's filename() function + InputFileBlockHolder.set(filePath, task.start(), task.length()); + + InputFile inputFile = getInputFile(task.file().path().toString()); + Preconditions.checkNotNull( + inputFile, "Could not find InputFile associated with PositionDeleteScanTask"); + return newIterable( Review Comment: nit: What about an empty line before `return`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org