aokolnychyi commented on code in PR #11273: URL: https://github.com/apache/iceberg/pull/11273#discussion_r1792506398
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java: ########## @@ -158,6 +163,26 @@ public void filter(Predicate[] predicates) { } } + protected Map<String, DeleteFileSet> dataToFileScopedDeletes() { + if (dataToFileScopedDeletes == null) { + dataToFileScopedDeletes = Maps.newHashMap(); + for (ScanTask task : tasks()) { + FileScanTask fileScanTask = task.asFileScanTask(); + List<DeleteFile> fileScopedDeletes = + fileScanTask.deletes().stream() + .filter(file -> ContentFileUtil.referencedDataFileLocation(file) != null) + .collect(Collectors.toList()); Review Comment: What about avoiding an extra list to hold the temp state by using a for loop? Streams also have more overhead. ``` for (ScanTask task : tasks()) { FileScanTask fileTask = task.asFileScanTask(); for (DeleteFile deleteFile : fileTask.deletes()) { if (ContentFileUtil.isFileScoped(deleteFile)) { rewritableDeletes .computeIfAbsent(fileTask.file().location(), ignored -> DeleteFileSet.create()) .add(deleteFile); } } } ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java: ########## @@ -158,6 +163,26 @@ public void filter(Predicate[] predicates) { } } + protected Map<String, DeleteFileSet> dataToFileScopedDeletes() { Review Comment: Let's use a more generic name like `rewritableDeletes`. We will use it for V3 tables as well where we will need to rewrite partition-scoped deletes during the first ever V3 write. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java: ########## @@ -68,6 +72,7 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask> private final Long asOfTimestamp; private final String tag; private final List<Expression> runtimeFilterExpressions; + private Map<String, DeleteFileSet> dataToFileScopedDeletes; Review Comment: Don't we access it once? Why keep a reference to it in the scan? ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java: ########## @@ -158,6 +163,26 @@ public void filter(Predicate[] predicates) { } } + protected Map<String, DeleteFileSet> dataToFileScopedDeletes() { + if (dataToFileScopedDeletes == null) { + dataToFileScopedDeletes = Maps.newHashMap(); + for (ScanTask task : tasks()) { + FileScanTask fileScanTask = task.asFileScanTask(); + List<DeleteFile> fileScopedDeletes = + fileScanTask.deletes().stream() + .filter(file -> ContentFileUtil.referencedDataFileLocation(file) != null) + .collect(Collectors.toList()); + for (DeleteFile deleteFile : fileScopedDeletes) { + dataToFileScopedDeletes + .computeIfAbsent(fileScanTask.file().location(), ignored -> DeleteFileSet.create()) + .add(deleteFile); + } + } + } + + return dataToFileScopedDeletes; Review Comment: I'd also consider returning null and not doing a broadcast if we didn't find any matching deletes to rewrite. ``` return rewritableDeletes.isEmpty() ? null : rewritableDeletes; ``` ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java: ########## @@ -336,31 +356,40 @@ DeleteFile[] deleteFiles() { return deleteFiles; } + DeleteFile[] rewrittenDeleteFiles() { + return rewrittenDeleteFiles; + } + CharSequence[] referencedDataFiles() { return referencedDataFiles; } } private static class PositionDeltaWriteFactory implements DeltaWriterFactory { private final Broadcast<Table> tableBroadcast; + private final Broadcast<Map<String, DeleteFileSet>> dataFileToFileScopedDeletesBroadcast; private final Command command; private final Context context; private final Map<String, String> writeProperties; PositionDeltaWriteFactory( Broadcast<Table> tableBroadcast, + Broadcast<Map<String, DeleteFileSet>> dataFileToFileScopedDeletesBroadcast, Command command, Context context, Map<String, String> writeProperties) { this.tableBroadcast = tableBroadcast; this.command = command; this.context = context; this.writeProperties = writeProperties; + this.dataFileToFileScopedDeletesBroadcast = dataFileToFileScopedDeletesBroadcast; Review Comment: Minor: Let's match the order of fields when we initialize variables in the constructor. ########## core/src/main/java/org/apache/iceberg/TableProperties.java: ########## @@ -383,4 +383,8 @@ private TableProperties() {} public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; + + public static final String MAINTAIN_POSITION_DELETES_DURING_WRITE = Review Comment: I am not sure we need this configuration. It is pretty clear we want to always maintain position deletes. We will also not support this property in V3. Given that we want to switch to file-scoped position deletes in V2 tables by default, I think we should just always maintain deletes. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java: ########## @@ -158,6 +163,26 @@ public void filter(Predicate[] predicates) { } } + protected Map<String, DeleteFileSet> dataToFileScopedDeletes() { + if (dataToFileScopedDeletes == null) { + dataToFileScopedDeletes = Maps.newHashMap(); + for (ScanTask task : tasks()) { + FileScanTask fileScanTask = task.asFileScanTask(); + List<DeleteFile> fileScopedDeletes = + fileScanTask.deletes().stream() + .filter(file -> ContentFileUtil.referencedDataFileLocation(file) != null) Review Comment: It looks like we will need this in a few places. What about adding `isFileScoped` to `ContentFileUtil`? I used the same logic in the fanout writer PR. ########## core/src/main/java/org/apache/iceberg/TableProperties.java: ########## @@ -383,4 +383,8 @@ private TableProperties() {} public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; + + public static final String MAINTAIN_POSITION_DELETES_DURING_WRITE = Review Comment: In other words, switching `write.delete.granularity` to `file` should trigger maintenance. If set to `partition`, skip it as we can't do it safely. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java: ########## @@ -185,6 +196,7 @@ public void commit(WriterCommitMessage[] messages) { int addedDataFilesCount = 0; int addedDeleteFilesCount = 0; + int removedDeleteFilesCount = 0; Review Comment: Is this actually being used? I think it is very helpful but we have to update the commit/log message template. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java: ########## @@ -449,8 +493,33 @@ protected PartitioningWriter<PositionDelete<InternalRow>, DeleteWriteResult> new writers, files, io, targetFileSize, deleteGranularity); } else { return new FanoutPositionOnlyDeleteWriter<>( - writers, files, io, targetFileSize, deleteGranularity); + writers, Review Comment: We have to use `FanoutPositionOnlyDeleteWriter` if `Map<String, DeleteFileSet>` is not null. Even if the input is ordered. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java: ########## @@ -169,7 +174,13 @@ public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { // broadcast the table metadata as the writer factory will be sent to executors Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); - return new PositionDeltaWriteFactory(tableBroadcast, command, context, writeProperties); + Broadcast<Map<String, DeleteFileSet>> dataFileToFileScopedDeletesBroadcast = Review Comment: We should avoid the broadcast if there are no matching deletes to rewrite. Also, let's move this logic into a separate method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org