aokolnychyi commented on code in PR #11561:
URL: https://github.com/apache/iceberg/pull/11561#discussion_r1872486092


##########
core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.function.Function;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * PartitioningDVWriter is a PartitioningWriter implementation that writes DVs 
for a file position

Review Comment:
   Do we need `.` at the end? I am also not sure I understand `writes DVs for a 
file position`. Is there a way to rephrase the description a bit?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -182,15 +183,20 @@ public DeltaWriterFactory 
createBatchWriterFactory(PhysicalWriteInfo info) {
     }
 
     private Broadcast<Map<String, DeleteFileSet>> broadcastRewritableDeletes() 
{
-      if (context.deleteGranularity() == DeleteGranularity.FILE && scan != 
null) {
-        Map<String, DeleteFileSet> rewritableDeletes = 
scan.rewritableDeletes();
+      if (scan != null && shouldRewriteDeletes()) {
+        Map<String, DeleteFileSet> rewritableDeletes = 
scan.rewritableDeletes(context.useDVs());
         if (rewritableDeletes != null && !rewritableDeletes.isEmpty()) {
           return sparkContext.broadcast(rewritableDeletes);
         }
       }
       return null;
     }
 
+    private boolean shouldRewriteDeletes() {
+      // Rewritable deletes can either be DVs or file scoped position deletes

Review Comment:
   Minor: Other comments seem to start with lower case letters.



##########
core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.function.Function;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * PartitioningDVWriter is a PartitioningWriter implementation that writes DVs 
for a file position
+ */
+public class PartitioningDVWriter<T>
+    implements PartitioningWriter<PositionDelete<T>, DeleteWriteResult> {
+  private final DVFileWriter fileWriter;

Review Comment:
   Optional: What about just `writer` and an empty line before to separate the 
block with fields as the class definition is split on multiple lines?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -162,13 +162,14 @@ public void filter(Predicate[] predicates) {
     }
   }
 
-  protected Map<String, DeleteFileSet> rewritableDeletes() {
+  protected Map<String, DeleteFileSet> rewritableDeletes(boolean forDVs) {
     Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap();
 
     for (ScanTask task : tasks()) {
       FileScanTask fileScanTask = task.asFileScanTask();
       for (DeleteFile deleteFile : fileScanTask.deletes()) {
-        if (ContentFileUtil.isFileScoped(deleteFile)) {
+        // Both file-scoped and partition-scoped position deletes must be 
rewritten for DVs
+        if (forDVs || ContentFileUtil.isFileScoped(deleteFile)) {

Review Comment:
   It would be nice to also cover this with tests.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -162,13 +162,14 @@ public void filter(Predicate[] predicates) {
     }
   }
 
-  protected Map<String, DeleteFileSet> rewritableDeletes() {
+  protected Map<String, DeleteFileSet> rewritableDeletes(boolean forDVs) {
     Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap();
 
     for (ScanTask task : tasks()) {
       FileScanTask fileScanTask = task.asFileScanTask();
       for (DeleteFile deleteFile : fileScanTask.deletes()) {
-        if (ContentFileUtil.isFileScoped(deleteFile)) {
+        // Both file-scoped and partition-scoped position deletes must be 
rewritten for DVs

Review Comment:
   Minor: Other comments in this class start with lower case letters.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -182,15 +183,20 @@ public DeltaWriterFactory 
createBatchWriterFactory(PhysicalWriteInfo info) {
     }
 
     private Broadcast<Map<String, DeleteFileSet>> broadcastRewritableDeletes() 
{
-      if (context.deleteGranularity() == DeleteGranularity.FILE && scan != 
null) {
-        Map<String, DeleteFileSet> rewritableDeletes = 
scan.rewritableDeletes();
+      if (scan != null && shouldRewriteDeletes()) {
+        Map<String, DeleteFileSet> rewritableDeletes = 
scan.rewritableDeletes(context.useDVs());
         if (rewritableDeletes != null && !rewritableDeletes.isEmpty()) {
           return sparkContext.broadcast(rewritableDeletes);
         }
       }
       return null;
     }
 
+    private boolean shouldRewriteDeletes() {
+      // Rewritable deletes can either be DVs or file scoped position deletes

Review Comment:
   I think a more accurate description would be that we have to rewrite deletes 
when dealing with DVs and file-scoped deletes.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -162,13 +162,14 @@ public void filter(Predicate[] predicates) {
     }
   }
 
-  protected Map<String, DeleteFileSet> rewritableDeletes() {
+  protected Map<String, DeleteFileSet> rewritableDeletes(boolean forDVs) {
     Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap();
 
     for (ScanTask task : tasks()) {
       FileScanTask fileScanTask = task.asFileScanTask();
       for (DeleteFile deleteFile : fileScanTask.deletes()) {
-        if (ContentFileUtil.isFileScoped(deleteFile)) {
+        // Both file-scoped and partition-scoped position deletes must be 
rewritten for DVs
+        if (forDVs || ContentFileUtil.isFileScoped(deleteFile)) {

Review Comment:
   What about equality deletes? Those should be ignored. If so, what about 
`shouldRewrite` method?
   
   ```
   for (DeleteFile deleteFile : fileScanTask.deletes()) {
     if (shouldRewrite(deleteFile, forDVs)) {
       ...
     }
   }
   ```
   ```
   private boolean shouldRewrite(DeleteFile deleteFile, boolean dvsEnabled) {
     ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to