aokolnychyi commented on code in PR #11273:
URL: https://github.com/apache/iceberg/pull/11273#discussion_r1799857526


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -169,7 +174,13 @@ public DeltaWriterFactory 
createBatchWriterFactory(PhysicalWriteInfo info) {
       // broadcast the table metadata as the writer factory will be sent to 
executors
       Broadcast<Table> tableBroadcast =
           sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
-      return new PositionDeltaWriteFactory(tableBroadcast, command, context, 
writeProperties);
+      Broadcast<Map<String, DeleteFileSet>> rewritableDeletes = null;
+      if (context.deleteGranularity() == DeleteGranularity.FILE && scan != 
null) {
+        rewritableDeletes = sparkContext.broadcast(scan.rewritableDeletes());

Review Comment:
   We should avoid the broadcast if the set of rewritable deletes is 
empty/null. I'd also move this into a helper method and modify the 
comment/invocation. Somel
   
   ```
   @Override
   public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
     // broadcast large objects as the writer factory will be sent to executors
     return new PositionDeltaWriteFactory(
         sparkContext.broadcast(SerializableTableWithSize.copyOf(table)),
         broadcastRewritableDeletes(),
         ...
   }
   
   private Broadcast<Map<String, DeleteFileSet>> broadcastRewritableDeletes() {
     if (context.deleteGranularity() == DeleteGranularity.FILE && scan != null) 
{
       Map<String, DeleteFileSet> rewritableDeletes = scan.rewritableDeletes();
       if (rewritableDeletes != null && !rewritableDeletes.isEmpty()) {
         return sparkContext.broadcast(rewritableDeletes);
       }
     }
   
     return null;
   }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java:
##########
@@ -158,6 +162,22 @@ public void filter(Predicate[] predicates) {
     }
   }
 
+  protected Map<String, DeleteFileSet> rewritableDeletes() {
+    Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap();
+    for (ScanTask task : tasks()) {

Review Comment:
   Minor: I'd say either add an empty line before `for` or remove after for 
consistency. Otherwise, it is not clear what logical block we separate.



##########
core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java:
##########
@@ -80,4 +80,8 @@ public static String referencedDataFileLocation(DeleteFile 
deleteFile) {
     CharSequence location = referencedDataFile(deleteFile);
     return location != null ? location.toString() : null;
   }
+
+  public static boolean isFileScopedDelete(DeleteFile deleteFile) {

Review Comment:
   Optional: I feel it reads well even without `delete` for shorter lines. 
   
   ```
   if (ContentFileUtil.isFileScoped(deleteFile)) {
     ...
   }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -361,6 +389,10 @@ private static class PositionDeltaWriteFactory implements 
DeltaWriterFactory {
     @Override
     public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) 
{
       Table table = tableBroadcast.value();
+      Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap();

Review Comment:
   What about a helper method and using it directly in statements below?
   
   ```
   private Map<String, DeleteFileSet> rewritableDeletes() {
     return rewritableDeletesBroadcast != null ? 
rewritableDeletesBroadcast.getValue() : null;
   }
   ```



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -437,23 +470,52 @@ protected PartitioningWriter<InternalRow, 
DataWriteResult> newDataWriter(
     // use a fanout writer if the input is unordered no matter whether fanout 
writers are enabled
     // clustered writers assume that the position deletes are already ordered 
by file and position
     protected PartitioningWriter<PositionDelete<InternalRow>, 
DeleteWriteResult> newDeleteWriter(
-        Table table, SparkFileWriterFactory writers, OutputFileFactory files, 
Context context) {
+        Table table,
+        Map<String, DeleteFileSet> rewritableDeletes,
+        SparkFileWriterFactory writers,
+        OutputFileFactory files,
+        Context context) {
 
       FileIO io = table.io();
       boolean inputOrdered = context.inputOrdered();
       long targetFileSize = context.targetDeleteFileSize();
       DeleteGranularity deleteGranularity = context.deleteGranularity();
 
-      if (inputOrdered) {
+      if (inputOrdered && rewritableDeletes.isEmpty()) {
         return new ClusteredPositionDeleteWriter<>(
             writers, files, io, targetFileSize, deleteGranularity);
       } else {
         return new FanoutPositionOnlyDeleteWriter<>(
-            writers, files, io, targetFileSize, deleteGranularity);
+            writers,
+            files,
+            io,
+            targetFileSize,
+            deleteGranularity,
+            new PreviousDeleteLoader(table, rewritableDeletes));
       }
     }
   }
 
+  private static class PreviousDeleteLoader implements Function<CharSequence, 
PositionDeleteIndex> {
+    private final Map<String, DeleteFileSet> deleteFiles;

Review Comment:
   Would naming this var as `rewritableDeletes` make sense for clarity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to