aokolnychyi commented on code in PR #6365:
URL: https://github.com/apache/iceberg/pull/6365#discussion_r1055905192


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -94,6 +94,10 @@ private MetadataColumns() {}
           Types.LongType.get(),
           "Commit snapshot ID");
 
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = 
Integer.MAX_VALUE - 107;
+  public static final int POSITION_DELETE_TABLE_SPEC_ID = Integer.MAX_VALUE - 
108;
+  public static final int POSITION_DELETE_TABLE_FILE_PATH = Integer.MAX_VALUE 
- 109;

Review Comment:
   Name missing ID?



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -94,6 +94,10 @@ private MetadataColumns() {}
           Types.LongType.get(),
           "Commit snapshot ID");
 
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = 
Integer.MAX_VALUE - 107;

Review Comment:
   If I understand correctly, the table schema will include these 3 columns in 
addition to columns  in delete files. It is not bad to reserve some IDs but 
have we thought about keeping the table schema limited to the content of delete 
files and supporting already existing `_spec_id`, `_partition`, `_file` 
metadata columns? Values for metadata columns will be only projected on demand, 
just like we can do that for regular tables.
   
   It seems cleaner to me and shouldn't be hard to do since we will have a 
dedicated reader.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, 
Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan
+      extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> 
implements BatchScan {
+
+    protected PositionDeletesTableScan(TableOperations ops, Table table, 
Schema schema) {
+      super(ops, table, schema, new TableScanContext());
+    }
+
+    protected PositionDeletesTableScan(
+        TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+      super(ops, table, schema, context);
+    }
+
+    @Override
+    protected PositionDeletesTableScan newRefinedScan(
+        TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext) {
+      return new PositionDeletesTableScan(newOps, newTable, newSchema, 
newContext);
+    }
+
+    @Override
+    public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+      CloseableIterable<ScanTask> scanTasks = planFiles();

Review Comment:
   nit: Just use `planFiles()` directly?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, 
Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan
+      extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> 
implements BatchScan {
+
+    protected PositionDeletesTableScan(TableOperations ops, Table table, 
Schema schema) {
+      super(ops, table, schema, new TableScanContext());
+    }
+
+    protected PositionDeletesTableScan(
+        TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+      super(ops, table, schema, context);
+    }
+
+    @Override
+    protected PositionDeletesTableScan newRefinedScan(
+        TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext) {
+      return new PositionDeletesTableScan(newOps, newTable, newSchema, 
newContext);
+    }
+
+    @Override
+    public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+      CloseableIterable<ScanTask> scanTasks = planFiles();
+      return TableScanUtil.planTaskGroups(
+          scanTasks, targetSplitSize(), splitLookback(), splitOpenFileCost());
+    }
+
+    @Override
+    protected CloseableIterable<ScanTask> doPlanFiles() {
+      Expression rowFilter = context().rowFilter();

Review Comment:
   nit: We could just use `filter()` directly, which wraps 
`context().rowFilter()`.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, 
Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan
+      extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> 
implements BatchScan {
+
+    protected PositionDeletesTableScan(TableOperations ops, Table table, 
Schema schema) {
+      super(ops, table, schema, new TableScanContext());
+    }
+
+    protected PositionDeletesTableScan(
+        TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+      super(ops, table, schema, context);
+    }
+
+    @Override
+    protected PositionDeletesTableScan newRefinedScan(
+        TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext) {
+      return new PositionDeletesTableScan(newOps, newTable, newSchema, 
newContext);
+    }
+
+    @Override
+    public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+      CloseableIterable<ScanTask> scanTasks = planFiles();
+      return TableScanUtil.planTaskGroups(
+          scanTasks, targetSplitSize(), splitLookback(), splitOpenFileCost());
+    }
+
+    @Override
+    protected CloseableIterable<ScanTask> doPlanFiles() {
+      Expression rowFilter = context().rowFilter();
+      String schemaString = SchemaParser.toJson(tableSchema());
+
+      Map<Integer, PartitionSpec> transformedSpecs =
+          table().specs().entrySet().stream()
+              .map(
+                  e ->
+                      Pair.of(
+                          e.getKey(), 
BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+              .collect(Collectors.toMap(Pair::first, Pair::second));
+
+      CloseableIterable<ManifestFile> deleteManifests =
+          
CloseableIterable.withNoopClose(snapshot().deleteManifests(tableOps().io()));
+      CloseableIterable<CloseableIterable<ScanTask>> results =
+          CloseableIterable.transform(
+              deleteManifests,
+              m -> {

Review Comment:
   nit: `m` -> `manifest`?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, 
Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan
+      extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> 
implements BatchScan {
+
+    protected PositionDeletesTableScan(TableOperations ops, Table table, 
Schema schema) {
+      super(ops, table, schema, new TableScanContext());
+    }
+
+    protected PositionDeletesTableScan(
+        TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+      super(ops, table, schema, context);
+    }
+
+    @Override
+    protected PositionDeletesTableScan newRefinedScan(
+        TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext) {
+      return new PositionDeletesTableScan(newOps, newTable, newSchema, 
newContext);
+    }
+
+    @Override
+    public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+      CloseableIterable<ScanTask> scanTasks = planFiles();
+      return TableScanUtil.planTaskGroups(
+          scanTasks, targetSplitSize(), splitLookback(), splitOpenFileCost());
+    }
+
+    @Override
+    protected CloseableIterable<ScanTask> doPlanFiles() {
+      Expression rowFilter = context().rowFilter();
+      String schemaString = SchemaParser.toJson(tableSchema());
+
+      Map<Integer, PartitionSpec> transformedSpecs =
+          table().specs().entrySet().stream()
+              .map(
+                  e ->
+                      Pair.of(
+                          e.getKey(), 
BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+              .collect(Collectors.toMap(Pair::first, Pair::second));
+
+      CloseableIterable<ManifestFile> deleteManifests =
+          
CloseableIterable.withNoopClose(snapshot().deleteManifests(tableOps().io()));
+      CloseableIterable<CloseableIterable<ScanTask>> results =
+          CloseableIterable.transform(
+              deleteManifests,
+              m -> {
+                // Filter partitions
+                CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries 
=
+                    ManifestFiles.readDeleteManifest(m, tableOps().io(), 
transformedSpecs)
+                        .caseSensitive(isCaseSensitive())
+                        .filterRows(rowFilter)
+                        .liveEntries();
+
+                // Filter delete file type
+                CloseableIterable<ManifestEntry<DeleteFile>> 
positionDeleteEntries =
+                    CloseableIterable.filter(
+                        deleteFileEntries,
+                        entry -> 
entry.file().content().equals(FileContent.POSITION_DELETES));
+
+                Types.StructType partitionType = 
Partitioning.partitionType(table());
+
+                return CloseableIterable.transform(

Review Comment:
   I'd need to check resource closure in this block with fresh eyes.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field

Review Comment:
   nit: What about moving the entire second sentence to another line? Kind of 
awkward to just have one word.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, 
Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan
+      extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> 
implements BatchScan {
+
+    protected PositionDeletesTableScan(TableOperations ops, Table table, 
Schema schema) {
+      super(ops, table, schema, new TableScanContext());
+    }
+
+    protected PositionDeletesTableScan(
+        TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+      super(ops, table, schema, context);
+    }
+
+    @Override
+    protected PositionDeletesTableScan newRefinedScan(
+        TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext) {
+      return new PositionDeletesTableScan(newOps, newTable, newSchema, 
newContext);
+    }
+
+    @Override
+    public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+      CloseableIterable<ScanTask> scanTasks = planFiles();
+      return TableScanUtil.planTaskGroups(
+          scanTasks, targetSplitSize(), splitLookback(), splitOpenFileCost());
+    }
+
+    @Override
+    protected CloseableIterable<ScanTask> doPlanFiles() {
+      Expression rowFilter = context().rowFilter();
+      String schemaString = SchemaParser.toJson(tableSchema());
+
+      Map<Integer, PartitionSpec> transformedSpecs =
+          table().specs().entrySet().stream()
+              .map(
+                  e ->
+                      Pair.of(
+                          e.getKey(), 
BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+              .collect(Collectors.toMap(Pair::first, Pair::second));
+
+      CloseableIterable<ManifestFile> deleteManifests =
+          
CloseableIterable.withNoopClose(snapshot().deleteManifests(tableOps().io()));
+      CloseableIterable<CloseableIterable<ScanTask>> results =
+          CloseableIterable.transform(
+              deleteManifests,
+              m -> {
+                // Filter partitions
+                CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries 
=
+                    ManifestFiles.readDeleteManifest(m, tableOps().io(), 
transformedSpecs)
+                        .caseSensitive(isCaseSensitive())
+                        .filterRows(rowFilter)
+                        .liveEntries();
+
+                // Filter delete file type
+                CloseableIterable<ManifestEntry<DeleteFile>> 
positionDeleteEntries =
+                    CloseableIterable.filter(
+                        deleteFileEntries,
+                        entry -> 
entry.file().content().equals(FileContent.POSITION_DELETES));
+
+                Types.StructType partitionType = 
Partitioning.partitionType(table());
+
+                return CloseableIterable.transform(
+                    positionDeleteEntries,
+                    entry -> {
+                      PartitionSpec spec = 
transformedSpecs.get(entry.file().specId());
+                      String specString = PartitionSpecParser.toJson(spec);
+                      return new PositionDeleteScanTask(
+                          entry.file().copyWithoutStats(),
+                          schemaString,
+                          specString,
+                          ResidualEvaluator.of(spec, Expressions.alwaysTrue(), 
isCaseSensitive()),

Review Comment:
   Hm, why do we always use `Expressions.alwaysTrue()`?
   Shouldn't we use `filter()`, respect `shouldIgnoreResiduals()` and have a 
cache of evaluators?
   I wonder if there is an easy way to adapt the logic in `ManifestGroup` to 
also apply for delete manifests.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, 
Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan
+      extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> 
implements BatchScan {
+
+    protected PositionDeletesTableScan(TableOperations ops, Table table, 
Schema schema) {
+      super(ops, table, schema, new TableScanContext());
+    }
+
+    protected PositionDeletesTableScan(
+        TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+      super(ops, table, schema, context);
+    }
+
+    @Override
+    protected PositionDeletesTableScan newRefinedScan(
+        TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext) {
+      return new PositionDeletesTableScan(newOps, newTable, newSchema, 
newContext);
+    }
+
+    @Override
+    public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+      CloseableIterable<ScanTask> scanTasks = planFiles();
+      return TableScanUtil.planTaskGroups(
+          scanTasks, targetSplitSize(), splitLookback(), splitOpenFileCost());
+    }
+
+    @Override
+    protected CloseableIterable<ScanTask> doPlanFiles() {
+      Expression rowFilter = context().rowFilter();
+      String schemaString = SchemaParser.toJson(tableSchema());
+
+      Map<Integer, PartitionSpec> transformedSpecs =
+          table().specs().entrySet().stream()
+              .map(
+                  e ->
+                      Pair.of(
+                          e.getKey(), 
BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+              .collect(Collectors.toMap(Pair::first, Pair::second));
+
+      CloseableIterable<ManifestFile> deleteManifests =
+          
CloseableIterable.withNoopClose(snapshot().deleteManifests(tableOps().io()));
+      CloseableIterable<CloseableIterable<ScanTask>> results =
+          CloseableIterable.transform(
+              deleteManifests,
+              m -> {
+                // Filter partitions
+                CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries 
=
+                    ManifestFiles.readDeleteManifest(m, tableOps().io(), 
transformedSpecs)

Review Comment:
   Do we need to pass a projection while reading delete manifests?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, 
Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan

Review Comment:
   nit: `PositionDeletesBatchScan` since it is `BatchScan`?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));

Review Comment:
   nit: Wrong comment?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+
+public class PositionDeletesTable extends BaseTable {
+
+  private final Table table;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table.name() + ".position_deletes");
+    this.table = table;
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, name);
+    this.table = table;
+  }
+
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return PositionDeletesTable.schema(table(), 
Partitioning.partitionType(table()));
+  }
+
+  public static Schema schema(Table table, Types.StructType partitionType) {
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table.schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, 
Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan
+      extends AbstractTableScan<
+          BatchScan, org.apache.iceberg.ScanTask, 
ScanTaskGroup<org.apache.iceberg.ScanTask>>
+      implements BatchScan {
+
+    protected PositionDeletesTableScan(TableOperations ops, Table table, 
Schema schema) {
+      super(ops, table, schema, new TableScanContext());
+    }
+
+    protected PositionDeletesTableScan(
+        TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+      super(ops, table, schema, context);
+    }
+
+    @Override
+    protected PositionDeletesTableScan newRefinedScan(
+        TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext) {
+      return new PositionDeletesTableScan(newOps, newTable, newSchema, 
newContext);
+    }
+
+    @Override
+    protected CloseableIterable<org.apache.iceberg.ScanTask> doPlanFiles() {
+      Expression rowFilter = context().rowFilter();
+      String schemaString = SchemaParser.toJson(tableSchema());
+
+      Map<Integer, PartitionSpec> transformedSpecs =
+          table().specs().entrySet().stream()
+              .map(
+                  e ->

Review Comment:
   Will this be easier to read?
   
   ```
   Map<Integer, PartitionSpec> transformedSpecs =
       table().specs().values().stream()
           .map(spec -> BaseMetadataTable.transformSpec(tableSchema(), spec))
           .collect(Collectors.toMap(PartitionSpec::specId, spec -> spec));
   ```
   
   



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static 
org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  private final Schema schema;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+    this.schema = calculateSchema();
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+    this.schema = calculateSchema();
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return schema;
+  }
+
+  private Schema calculateSchema() {
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table().schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs 
to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. 
instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, 
Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan
+      extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> 
implements BatchScan {
+
+    protected PositionDeletesTableScan(TableOperations ops, Table table, 
Schema schema) {
+      super(ops, table, schema, new TableScanContext());
+    }
+
+    protected PositionDeletesTableScan(
+        TableOperations ops, Table table, Schema schema, TableScanContext 
context) {
+      super(ops, table, schema, context);
+    }
+
+    @Override
+    protected PositionDeletesTableScan newRefinedScan(
+        TableOperations newOps, Table newTable, Schema newSchema, 
TableScanContext newContext) {
+      return new PositionDeletesTableScan(newOps, newTable, newSchema, 
newContext);
+    }
+
+    @Override
+    public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+      CloseableIterable<ScanTask> scanTasks = planFiles();
+      return TableScanUtil.planTaskGroups(
+          scanTasks, targetSplitSize(), splitLookback(), splitOpenFileCost());
+    }
+
+    @Override
+    protected CloseableIterable<ScanTask> doPlanFiles() {
+      Expression rowFilter = context().rowFilter();
+      String schemaString = SchemaParser.toJson(tableSchema());
+
+      Map<Integer, PartitionSpec> transformedSpecs =
+          table().specs().entrySet().stream()
+              .map(
+                  e ->
+                      Pair.of(
+                          e.getKey(), 
BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+              .collect(Collectors.toMap(Pair::first, Pair::second));
+
+      CloseableIterable<ManifestFile> deleteManifests =
+          
CloseableIterable.withNoopClose(snapshot().deleteManifests(tableOps().io()));
+      CloseableIterable<CloseableIterable<ScanTask>> results =
+          CloseableIterable.transform(
+              deleteManifests,
+              m -> {
+                // Filter partitions
+                CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries 
=
+                    ManifestFiles.readDeleteManifest(m, tableOps().io(), 
transformedSpecs)
+                        .caseSensitive(isCaseSensitive())
+                        .filterRows(rowFilter)
+                        .liveEntries();
+
+                // Filter delete file type
+                CloseableIterable<ManifestEntry<DeleteFile>> 
positionDeleteEntries =
+                    CloseableIterable.filter(
+                        deleteFileEntries,
+                        entry -> 
entry.file().content().equals(FileContent.POSITION_DELETES));
+
+                Types.StructType partitionType = 
Partitioning.partitionType(table());
+
+                return CloseableIterable.transform(
+                    positionDeleteEntries,
+                    entry -> {
+                      PartitionSpec spec = 
transformedSpecs.get(entry.file().specId());
+                      String specString = PartitionSpecParser.toJson(spec);
+                      return new PositionDeleteScanTask(
+                          entry.file().copyWithoutStats(),
+                          schemaString,
+                          specString,
+                          ResidualEvaluator.of(spec, Expressions.alwaysTrue(), 
isCaseSensitive()),
+                          partitionType);
+                    });
+              });
+
+      return new ParallelIterable<>(results, planExecutor());
+    }
+  }
+
+  /** Scan task for position delete files */
+  public static class PositionDeleteScanTask

Review Comment:
   Do we need to add a public interface `PositionDeletesScanTask` and then 
implement it here? That seems to be what we do for other tasks. We don't expose 
classes directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to