aokolnychyi commented on code in PR #6716: URL: https://github.com/apache/iceberg/pull/6716#discussion_r1094892920
########## core/src/main/java/org/apache/iceberg/PositionDeletesTable.java: ########## @@ -75,16 +75,15 @@ public Schema schema() { return schema; } - private Schema calculateSchema() { - Types.StructType partitionType = Partitioning.partitionType(table()); + public static Schema schema(Schema schema, Types.StructType partitionType) { Review Comment: Are we opening this up purely for testing? If so, can we simply construct a table there and call `schema()`? I don't mind having package private methods visible for testing but it would be nice to avoid extra public methods. We are trying to be more careful with what is exposed even to developers. ########## parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java: ########## @@ -50,15 +51,22 @@ public class ParquetMetricsRowGroupFilter { private final Schema schema; private final Expression expr; + private final Set<Integer> constantFieldIds; public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) { - this(schema, unbound, true); + this(schema, unbound, true, ImmutableSet.of()); Review Comment: The fix here seems correct. I think we also have this issue in CDC. Let me check there. ########## core/src/main/java/org/apache/iceberg/MetadataTable.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** Interface representing a metadata table. */ +public interface MetadataTable { Review Comment: If I understand correctly, this interface exists only to build a correct partition map with constants. What about making our position deletes table expose the correct partitioning of the base table instead? Would it be fair to say it is partitioned in the same way as the main table? Delete files are also annotated with spec IDs. If so, it will be a matter of adding the following logic in `PositionDeletesTable`: ``` private final int defaultSpecId; private final Map<Integer, PartitionSpec> specs; ... PositionDeletesTable(Table table, String name) { super(table, name); this.schema = schema(table().schema(), Partitioning.partitionType(table())); this.defaultSpecId = table.spec().specId(); this.specs = table.specs(); } ... @Override public PartitionSpec spec() { return specs.get(defaultSpecId); } @Override public Map<Integer, PartitionSpec> specs() { return specs; } ``` After that, we should be able to remove this interface and also simply use `BaseReader$constantsMap` and remove most of the custom logic in `PositionDeleteRowReader`. ########## spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetadataTable; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask> + implements PartitionReader<InternalRow> { + private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class); + + PositionDeleteRowReader(SparkInputPartition partition) { + this( + partition.table(), + partition.taskGroup(), + partition.expectedSchema(), + partition.isCaseSensitive()); + } + + PositionDeleteRowReader( + Table table, + ScanTaskGroup<PositionDeletesScanTask> taskGroup, + Schema expectedSchema, + boolean caseSensitive) { + + super(table, taskGroup, expectedSchema, caseSensitive); + + int numSplits = taskGroup.tasks().size(); + LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name()); + } + + @Override + protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) { + return Stream.of(task.file()); + } + + @Override + protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) { + Table deletesTable = table(); + Preconditions.checkArgument( + deletesTable instanceof MetadataTable, + "PositionDeleteRowReader is only supported for PositionDeletesTable"); + Preconditions.checkArgument( + ((MetadataTable) deletesTable).type().equals(MetadataTableType.POSITION_DELETES), + "PositionDeleteRowReader is only supported for PositionDeletesTable"); + Table baseTable = ((MetadataTable) deletesTable).baseTable(); Review Comment: I feel this logic won't be needed if we expose correct specs in the position deletes metadata table, like I mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org