aokolnychyi commented on code in PR #11273: URL: https://github.com/apache/iceberg/pull/11273#discussion_r1799857526
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java: ########## @@ -169,7 +174,13 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); + Broadcast<Map<String, DeleteFileSet>> rewritableDeletes = null; + if (context.deleteGranularity() == DeleteGranularity.FILE && scan != null) { + rewritableDeletes = sparkContext.broadcast(scan.rewritableDeletes()); Review Comment: We should avoid the broadcast if the set of rewritable deletes is empty/null. I'd also move this into a helper method and modify the comment/invocation. Somel ``` @Override public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast large objects as the writer factory will be sent to executors return new PositionDeltaWriteFactory( sparkContext.broadcast(SerializableTableWithSize.copyOf(table)), broadcastRewritableDeletes(), ... } private Broadcast<Map<String, DeleteFileSet>> broadcastRewritableDeletes() { if (context.deleteGranularity() == DeleteGranularity.FILE && scan != null) { Map<String, DeleteFileSet> rewritableDeletes = scan.rewritableDeletes(); if (rewritableDeletes != null && !rewritableDeletes.isEmpty()) { return sparkContext.broadcast(rewritableDeletes); } } return null; } ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java: ########## @@ -158,6 +162,22 @@ public void filter(Predicate[] predicates) { } } + protected Map<String, DeleteFileSet> rewritableDeletes() { + Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap(); + for (ScanTask task : tasks()) { Review Comment: Minor: I'd say either add an empty line before `for` or remove after for consistency. Otherwise, it is not clear what logical block we separate. ########## core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java: ########## @@ -80,4 +80,8 @@ public static String referencedDataFileLocation(DeleteFile deleteFile) { CharSequence location = referencedDataFile(deleteFile); return location != null ? location.toString() : null; } + + public static boolean isFileScopedDelete(DeleteFile deleteFile) { Review Comment: Optional: I feel it reads well even without `delete` for shorter lines. ``` if (ContentFileUtil.isFileScoped(deleteFile)) { ... } ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java: ########## @@ -361,6 +389,10 @@ private static class PositionDeltaWriteFactory implements DeltaWriterFactory { @Override public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) { Table table = tableBroadcast.value(); + Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap(); Review Comment: What about a helper method and using it directly in statements below? ``` private Map<String, DeleteFileSet> rewritableDeletes() { return rewritableDeletesBroadcast != null ? rewritableDeletesBroadcast.getValue() : null; } ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java: ########## @@ -437,23 +470,52 @@ protected PartitioningWriter<InternalRow, DataWriteResult> newDataWriter( // use a fanout writer if the input is unordered no matter whether fanout writers are enabled // clustered writers assume that the position deletes are already ordered by file and position protected PartitioningWriter<PositionDelete<InternalRow>, DeleteWriteResult> newDeleteWriter( - Table table, SparkFileWriterFactory writers, OutputFileFactory files, Context context) { + Table table, + Map<String, DeleteFileSet> rewritableDeletes, + SparkFileWriterFactory writers, + OutputFileFactory files, + Context context) { FileIO io = table.io(); boolean inputOrdered = context.inputOrdered(); long targetFileSize = context.targetDeleteFileSize(); DeleteGranularity deleteGranularity = context.deleteGranularity(); - if (inputOrdered) { + if (inputOrdered && rewritableDeletes.isEmpty()) { return new ClusteredPositionDeleteWriter<>( writers, files, io, targetFileSize, deleteGranularity); } else { return new FanoutPositionOnlyDeleteWriter<>( - writers, files, io, targetFileSize, deleteGranularity); + writers, + files, + io, + targetFileSize, + deleteGranularity, + new PreviousDeleteLoader(table, rewritableDeletes)); } } } + private static class PreviousDeleteLoader implements Function<CharSequence, PositionDeleteIndex> { + private final Map<String, DeleteFileSet> deleteFiles; Review Comment: Would naming this var as `rewritableDeletes` make sense for clarity? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org