aokolnychyi commented on code in PR #11561:
URL: https://github.com/apache/iceberg/pull/11561#discussion_r1859282133


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -168,7 +168,7 @@ protected Map<String, DeleteFileSet> rewritableDeletes() {
     for (ScanTask task : tasks()) {
       FileScanTask fileScanTask = task.asFileScanTask();
       for (DeleteFile deleteFile : fileScanTask.deletes()) {
-        if (ContentFileUtil.isFileScoped(deleteFile)) {
+        if (ContentFileUtil.isFileScoped(deleteFile) || 
ContentFileUtil.isDV(deleteFile)) {

Review Comment:
   Aren't DVs considered file-scoped? I think `isFileScoped` will always be 
true for DVs (please check).



##########
core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.function.Function;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * PartitioningDVWriter is a PartitioningWriter implementation that writes DVs 
for a file position
+ */
+public class PartitioningDVWriter<T>
+    implements PartitioningWriter<PositionDelete<T>, DeleteWriteResult> {
+  private final DVFileWriter fileWriter;
+  private DeleteWriteResult result;
+
+  public PartitioningDVWriter(
+      OutputFileFactory fileFactory,
+      Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
+    this.fileWriter = new BaseDVFileWriter(fileFactory, 
loadPreviousDeletes::apply);
+  }
+
+  @Override
+  public void write(PositionDelete<T> row, PartitionSpec spec, StructLike 
partition) {
+    fileWriter.delete(row.path().toString(), row.pos(), spec, partition);
+  }
+
+  @Override
+  public DeleteWriteResult result() {
+    Preconditions.checkState(result != null, "Cannot get result from unclosed 
writer");
+    return result;
+  }
+
+  @Override
+  public void close() throws IOException {
+    fileWriter.close();

Review Comment:
   I know we usually check if `result` is null to see if the writer was closed 
and we are not re-assigning the result object. It probably works in this case 
(knowing the underlying implementation), but I wonder whether making this check 
explicit will make the behavior of this method clear even without checking how 
the delegate writer works.
   
   ```
   if (result == null) {
     fileWriter.close();
     this.result = fileWriter.result();
   }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -483,6 +493,14 @@ protected PartitioningWriter<PositionDelete<InternalRow>, 
DeleteWriteResult> new
         SparkFileWriterFactory writers,

Review Comment:
   Shall we update the comment above this method to mention DVs?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -191,6 +195,12 @@ private Broadcast<Map<String, DeleteFileSet>> 
broadcastRewritableDeletes() {
       return null;
     }
 
+    private boolean shouldBroadcastRewritableDeletes(TableOperations ops) {

Review Comment:
   What about `shouldRewriteDeletes` as a shorter name? Also, I'd consider 
moving the logic for detecting whether DVs should be used to `SparkWriteConf`, 
where we can eventually call Eduard's utility method. We can then propagate 
this flag to the `Context` object within `SparkPositionDeltaWrite`.
   
   ```
   private boolean shouldRewriteDeletes() {
     return context.useDVs() || context.deleteGranularity() == 
DeleteGranularity.FILE;
   }
   ```
   
   With a method in `SparkWriteConf`:
   
   ```
   public boolean useDVs() {
     // call a utility method on Table
   }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -483,6 +493,14 @@ protected PartitioningWriter<PositionDelete<InternalRow>, 
DeleteWriteResult> new
         SparkFileWriterFactory writers,
         OutputFileFactory files,
         Context context) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Function<CharSequence, PositionDeleteIndex> previousDeleteLoader =
+          rewritableDeletes != null
+              ? new PreviousDeleteLoader(table, rewritableDeletes)
+              : path -> null;
+      if (ops.current().formatVersion() >= 3) {

Review Comment:
   We can reuse `context.useDVs()` here as well. I'd also co-locate all return 
statements into one if-else block.
   
   ```
   FileIO io = table.io();
   boolean inputOrdered = context.inputOrdered();
   long targetFileSize = context.targetDeleteFileSize();
   DeleteGranularity deleteGranularity = context.deleteGranularity();
   boolean useDVs = context.useDVs();
   
   if (useDVs) {
     return new PartitioningDVWriter<>(...);
   } else if (inputOrdered && rewritableDeletes == null) {
     return new ClusteredPositionDeleteWriter<>(...);
   } else {
     return new FanoutPositionOnlyDeleteWriter<>(...);
   }
   ```
   
   If you think that extra variables make things more complicated, we can 
remove them and use `context` directly. Up to you.



##########
core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.function.Function;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * PartitioningDVWriter is a PartitioningWriter implementation which writes 
DVs for a given file
+ * position
+ */
+public class PartitioningDVWriter<T>

Review Comment:
   I think this class more or less fits what we assume by `PartitioningWriter` 
(the ability to write to multiple specs and partitions). One downside is the 
need to wrap and unwrap deletes into `PositionDelete`, which is probably minor. 
Let me see how we consume this API in Spark.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -483,6 +493,14 @@ protected PartitioningWriter<PositionDelete<InternalRow>, 
DeleteWriteResult> new
         SparkFileWriterFactory writers,
         OutputFileFactory files,
         Context context) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      Function<CharSequence, PositionDeleteIndex> previousDeleteLoader =

Review Comment:
   What about a helper method like `createPreviousDeleteLoader` or 
`PreviousDeleteLoader.create`?



##########
core/src/main/java/org/apache/iceberg/io/PartitioningDVWriter.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.function.Function;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * PartitioningDVWriter is a PartitioningWriter implementation which writes 
DVs for a given file
+ * position
+ */
+public class PartitioningDVWriter<T>

Review Comment:
   This generally makes sense to me and like that it simplifies the Spark side. 
The alternative would be to expose a custom implementation of 
`PositionDeltaWriter` for DVs. While it would not need `PositionDelete` 
wrappers, it will require more changes in Spark. Let's stick to what this PR 
has, it should be a simpler option.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -168,7 +168,7 @@ protected Map<String, DeleteFileSet> rewritableDeletes() {
     for (ScanTask task : tasks()) {
       FileScanTask fileScanTask = task.asFileScanTask();
       for (DeleteFile deleteFile : fileScanTask.deletes()) {
-        if (ContentFileUtil.isFileScoped(deleteFile)) {
+        if (ContentFileUtil.isFileScoped(deleteFile) || 
ContentFileUtil.isDV(deleteFile)) {

Review Comment:
   Actually, if DVs are enabled, we have to include all position deletes, not 
just file-scoped. The first ever produced DV must include all previous deletes. 
The only difference is that we can drop file-scoped from the table state 
(decided in the writer) and still have to keep partition-scoped deletes as they 
may apply to other data files.
   
   Can we please test migration to DVs for tables that have a mix of 
file-scoped and partition-scoped position deletes?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -191,6 +195,12 @@ private Broadcast<Map<String, DeleteFileSet>> 
broadcastRewritableDeletes() {
       return null;
     }
 
+    private boolean shouldBroadcastRewritableDeletes(TableOperations ops) {

Review Comment:
   The point I am trying to make is that accessing `TableOperations` here is 
too low-level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to