aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1094892920


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -75,16 +75,15 @@ public Schema schema() {
     return schema;
   }
 
-  private Schema calculateSchema() {
-    Types.StructType partitionType = Partitioning.partitionType(table());
+  public static Schema schema(Schema schema, Types.StructType partitionType) {

Review Comment:
   Are we opening this up purely for testing? If so, can we simply construct a 
table there and call `schema()`? I don't mind having package private methods 
visible for testing but it would be nice to avoid extra public methods. We are 
trying to be more careful with what is exposed even to developers.



##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java:
##########
@@ -50,15 +51,22 @@ public class ParquetMetricsRowGroupFilter {
 
   private final Schema schema;
   private final Expression expr;
+  private final Set<Integer> constantFieldIds;
 
   public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
-    this(schema, unbound, true);
+    this(schema, unbound, true, ImmutableSet.of());

Review Comment:
   The fix here seems correct. I think we also have this issue in CDC. Let me 
check there.



##########
core/src/main/java/org/apache/iceberg/MetadataTable.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface representing a metadata table. */
+public interface MetadataTable {

Review Comment:
   If I understand correctly, this interface exists only to build a correct 
partition map with constants. What about making our position deletes table 
expose the correct partitioning of the base table instead? Would it be fair to 
say it is partitioned in the same way as the main table? Delete files are also 
annotated with spec IDs.
   
   If so, it will be a matter of adding the following logic in 
`PositionDeletesTable`:
   ```
   private final int defaultSpecId;
   private final Map<Integer, PartitionSpec> specs;
   
   ...
   
   PositionDeletesTable(Table table, String name) {
     super(table, name);
     this.schema = schema(table().schema(), 
Partitioning.partitionType(table()));
     this.defaultSpecId = table.spec().specId();
     this.specs = table.specs();
   }
   
   ...
   
   @Override
   public PartitionSpec spec() {
     return specs.get(defaultSpecId);
   }
   
   @Override
   public Map<Integer, PartitionSpec> specs() {
     return specs;
   }
   ```
   
   After that, we should be able to remove this interface and also simply use 
`BaseReader$constantsMap` and remove most of the custom logic in 
`PositionDeleteRowReader`.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTable;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", 
numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask 
task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Table deletesTable = table();
+    Preconditions.checkArgument(
+        deletesTable instanceof MetadataTable,
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Preconditions.checkArgument(
+        ((MetadataTable) 
deletesTable).type().equals(MetadataTableType.POSITION_DELETES),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Table baseTable = ((MetadataTable) deletesTable).baseTable();

Review Comment:
   I feel this logic won't be needed if we expose correct specs in the position 
deletes metadata table, like I mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to