aokolnychyi commented on code in PR #11273:
URL: https://github.com/apache/iceberg/pull/11273#discussion_r1792506398


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -158,6 +163,26 @@ public void filter(Predicate[] predicates) {
     }
   }
 
+  protected Map<String, DeleteFileSet> dataToFileScopedDeletes() {
+    if (dataToFileScopedDeletes == null) {
+      dataToFileScopedDeletes = Maps.newHashMap();
+      for (ScanTask task : tasks()) {
+        FileScanTask fileScanTask = task.asFileScanTask();
+        List<DeleteFile> fileScopedDeletes =
+            fileScanTask.deletes().stream()
+                .filter(file -> 
ContentFileUtil.referencedDataFileLocation(file) != null)
+                .collect(Collectors.toList());

Review Comment:
   What about avoiding an extra list to hold the temp state by using a for 
loop? Streams also have more overhead.
   
   ```
   for (ScanTask task : tasks()) {
     FileScanTask fileTask = task.asFileScanTask();
     for (DeleteFile deleteFile : fileTask.deletes()) {
       if (ContentFileUtil.isFileScoped(deleteFile)) {
         rewritableDeletes
             .computeIfAbsent(fileTask.file().location(), ignored -> 
DeleteFileSet.create())
             .add(deleteFile);
       }
     }
   }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -158,6 +163,26 @@ public void filter(Predicate[] predicates) {
     }
   }
 
+  protected Map<String, DeleteFileSet> dataToFileScopedDeletes() {

Review Comment:
   Let's use a more generic name like `rewritableDeletes`. We will use it for 
V3 tables as well where we will need to rewrite partition-scoped deletes during 
the first ever V3 write.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -68,6 +72,7 @@ class SparkBatchQueryScan extends 
SparkPartitioningAwareScan<PartitionScanTask>
   private final Long asOfTimestamp;
   private final String tag;
   private final List<Expression> runtimeFilterExpressions;
+  private Map<String, DeleteFileSet> dataToFileScopedDeletes;

Review Comment:
   Don't we access it once? Why keep a reference to it in the scan?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -158,6 +163,26 @@ public void filter(Predicate[] predicates) {
     }
   }
 
+  protected Map<String, DeleteFileSet> dataToFileScopedDeletes() {
+    if (dataToFileScopedDeletes == null) {
+      dataToFileScopedDeletes = Maps.newHashMap();
+      for (ScanTask task : tasks()) {
+        FileScanTask fileScanTask = task.asFileScanTask();
+        List<DeleteFile> fileScopedDeletes =
+            fileScanTask.deletes().stream()
+                .filter(file -> 
ContentFileUtil.referencedDataFileLocation(file) != null)
+                .collect(Collectors.toList());
+        for (DeleteFile deleteFile : fileScopedDeletes) {
+          dataToFileScopedDeletes
+              .computeIfAbsent(fileScanTask.file().location(), ignored -> 
DeleteFileSet.create())
+              .add(deleteFile);
+        }
+      }
+    }
+
+    return dataToFileScopedDeletes;

Review Comment:
   I'd also consider returning null and not doing a broadcast if we didn't find 
any matching deletes to rewrite.
   
   ```
    return rewritableDeletes.isEmpty() ? null : rewritableDeletes;
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -336,31 +356,40 @@ DeleteFile[] deleteFiles() {
       return deleteFiles;
     }
 
+    DeleteFile[] rewrittenDeleteFiles() {
+      return rewrittenDeleteFiles;
+    }
+
     CharSequence[] referencedDataFiles() {
       return referencedDataFiles;
     }
   }
 
   private static class PositionDeltaWriteFactory implements DeltaWriterFactory 
{
     private final Broadcast<Table> tableBroadcast;
+    private final Broadcast<Map<String, DeleteFileSet>> 
dataFileToFileScopedDeletesBroadcast;
     private final Command command;
     private final Context context;
     private final Map<String, String> writeProperties;
 
     PositionDeltaWriteFactory(
         Broadcast<Table> tableBroadcast,
+        Broadcast<Map<String, DeleteFileSet>> 
dataFileToFileScopedDeletesBroadcast,
         Command command,
         Context context,
         Map<String, String> writeProperties) {
       this.tableBroadcast = tableBroadcast;
       this.command = command;
       this.context = context;
       this.writeProperties = writeProperties;
+      this.dataFileToFileScopedDeletesBroadcast = 
dataFileToFileScopedDeletesBroadcast;

Review Comment:
   Minor: Let's match the order of fields when we initialize variables in the 
constructor.



##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -383,4 +383,8 @@ private TableProperties() {}
   public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16;
 
   public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16;
+
+  public static final String MAINTAIN_POSITION_DELETES_DURING_WRITE =

Review Comment:
   I am not sure we need this configuration. It is pretty clear we want to 
always maintain position deletes. We will also not support this property in V3. 
Given that we want to switch to file-scoped position deletes in V2 tables by 
default, I think we should just always maintain deletes.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -158,6 +163,26 @@ public void filter(Predicate[] predicates) {
     }
   }
 
+  protected Map<String, DeleteFileSet> dataToFileScopedDeletes() {
+    if (dataToFileScopedDeletes == null) {
+      dataToFileScopedDeletes = Maps.newHashMap();
+      for (ScanTask task : tasks()) {
+        FileScanTask fileScanTask = task.asFileScanTask();
+        List<DeleteFile> fileScopedDeletes =
+            fileScanTask.deletes().stream()
+                .filter(file -> 
ContentFileUtil.referencedDataFileLocation(file) != null)

Review Comment:
   It looks like we will need this in a few places. What about adding 
`isFileScoped` to `ContentFileUtil`? I used the same logic in the fanout writer 
PR.



##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -383,4 +383,8 @@ private TableProperties() {}
   public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16;
 
   public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16;
+
+  public static final String MAINTAIN_POSITION_DELETES_DURING_WRITE =

Review Comment:
   In other words, switching `write.delete.granularity` to `file` should 
trigger maintenance.
   If set to `partition`, skip it as we can't do it safely.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -185,6 +196,7 @@ public void commit(WriterCommitMessage[] messages) {
 
       int addedDataFilesCount = 0;
       int addedDeleteFilesCount = 0;
+      int removedDeleteFilesCount = 0;

Review Comment:
   Is this actually being used? I think it is very helpful but we have to 
update the commit/log message template.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -449,8 +493,33 @@ protected PartitioningWriter<PositionDelete<InternalRow>, 
DeleteWriteResult> new
             writers, files, io, targetFileSize, deleteGranularity);
       } else {
         return new FanoutPositionOnlyDeleteWriter<>(
-            writers, files, io, targetFileSize, deleteGranularity);
+            writers,

Review Comment:
   We have to use `FanoutPositionOnlyDeleteWriter` if `Map<String, 
DeleteFileSet>` is not null. Even if the input is ordered.



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -169,7 +174,13 @@ public DeltaWriterFactory 
createBatchWriterFactory(PhysicalWriteInfo info) {
       // broadcast the table metadata as the writer factory will be sent to 
executors
       Broadcast<Table> tableBroadcast =
           sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
-      return new PositionDeltaWriteFactory(tableBroadcast, command, context, 
writeProperties);
+      Broadcast<Map<String, DeleteFileSet>> 
dataFileToFileScopedDeletesBroadcast =

Review Comment:
   We should avoid the broadcast if there are no matching deletes to rewrite.
   Also, let's move this logic into a separate method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to