amogh-jahagirdar commented on code in PR #11693:
URL: https://github.com/apache/iceberg/pull/11693#discussion_r1868316838


##########
core/src/test/java/org/apache/iceberg/FileGenerationUtil.java:
##########
@@ -102,14 +102,18 @@ public static DeleteFile generateEqualityDeleteFile(Table 
table, StructLike part
   }
 
   public static DeleteFile generateDV(Table table, DataFile dataFile) {
+    return generateDV(table, dataFile, "/path/to/delete-" + UUID.randomUUID() 
+ ".puffin");

Review Comment:
   No longer needed, remove 



##########
core/src/test/java/org/apache/iceberg/TestRowDelta.java:
##########
@@ -1548,23 +1559,138 @@ public void testConcurrentMergeRewriteSameDeleteFile() 
{
   }
 
   @TestTemplate
-  public void testConcurrentDVsForSameDataFile() {
+  public void testConcurrentDVsForSameDataFile() throws IOException {
     assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
 
     DataFile dataFile = newDataFile("data_bucket=0");
     commit(table, table.newRowDelta().addRows(dataFile), branch);
+    List<PositionDelete<?>> deletes = Lists.newArrayList();
+    // Delete the first 4 positions in dataFile
+    for (int i = 0; i < 4; i++) {
+      deletes.add(PositionDelete.create().set(dataFile.location(), i));
+    }
 
-    DeleteFile deleteFile1 = newDeletes(dataFile);
+    OutputFileFactory fileFactory =
+        OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PUFFIN).build();
+    DeleteFile deleteFile1 = writeDV(deletes, dataFile.partition(), 
fileFactory);
     RowDelta rowDelta1 = table.newRowDelta().addDeletes(deleteFile1);
 
-    DeleteFile deleteFile2 = newDeletes(dataFile);
+    List<PositionDelete<?>> conflictingDeletes = Lists.newArrayList();
+
+    // Delete positions 4 through 7
+    for (int i = 4; i < 8; i++) {
+      conflictingDeletes.add(PositionDelete.create().set(dataFile.location(), 
i));
+    }
+
+    DeleteFile deleteFile2 = writeDV(conflictingDeletes, dataFile.partition(), 
fileFactory);
     RowDelta rowDelta2 = table.newRowDelta().addDeletes(deleteFile2);
 
     commit(table, rowDelta1, branch);
+    commit(table, rowDelta2, branch);
 
-    assertThatThrownBy(() -> commit(table, rowDelta2, branch))
-        .isInstanceOf(ValidationException.class)
-        .hasMessageContaining("Found concurrently added DV for %s", 
dataFile.location());
+    Set<DeleteFile> dvs =
+        deleteFiles(table, branch).stream()
+            .filter(ContentFileUtil::isDV)
+            .collect(Collectors.toSet());
+    assertThat(dvs).as("There should be exactly 1 DV").hasSize(1);
+    DeleteFile dv = dvs.iterator().next();
+    assertThat(dv.recordCount()).as("The cardinality of the DV should be 
8").isEqualTo(8);
+    PositionDeleteIndex positionDeleteIndex = readDV(table, dv);
+    for (int i = 0; i < 8; i++) {
+      assertThat(positionDeleteIndex.isDeleted(i))
+          .as("Expected position " + i + " to be deleted")
+          .isTrue();
+    }
+  }
+
+  @TestTemplate
+  public void testConcurrentDVsForMultipleDataFiles() throws IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    DataFile dataFile = newDataFile("data_bucket=0");
+    DataFile dataFile2 = newDataFile("data_bucket=1");
+    commit(table, table.newRowDelta().addRows(dataFile).addRows(dataFile2), 
branch);
+    List<PositionDelete<?>> deletes = Lists.newArrayList();
+    List<PositionDelete<?>> deletesForDataFile2 = Lists.newArrayList();
+
+    // Delete the first 4 positions in dataFile and dataFile2
+    for (int i = 0; i < 4; i++) {
+      deletes.add(PositionDelete.create().set(dataFile.location(), i));
+      
deletesForDataFile2.add(PositionDelete.create().set(dataFile2.location(), i));
+    }
+
+    OutputFileFactory fileFactory =
+        OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PUFFIN).build();
+    DeleteFile deleteFile1 = writeDV(deletes, dataFile.partition(), 
fileFactory);
+    DeleteFile deleteFile2 = writeDV(deletesForDataFile2, 
dataFile2.partition(), fileFactory);
+    RowDelta rowDelta1 = 
table.newRowDelta().addDeletes(deleteFile1).addDeletes(deleteFile2);
+
+    List<PositionDelete<?>> conflictingDeletesForDataFile = 
Lists.newArrayList();
+    List<PositionDelete<?>> conflictingDeletesForDataFile2 = 
Lists.newArrayList();
+
+    // Delete positions 4 through 7 in dataFile
+    for (int i = 4; i < 8; i++) {
+      
conflictingDeletesForDataFile.add(PositionDelete.create().set(dataFile.location(),
 i));
+      
conflictingDeletesForDataFile2.add(PositionDelete.create().set(dataFile2.location(),
 i));
+    }
+
+    DeleteFile conflictingDeletes =
+        writeDV(conflictingDeletesForDataFile, dataFile.partition(), 
fileFactory);
+    DeleteFile conflictingDeletes2 =
+        writeDV(conflictingDeletesForDataFile2, dataFile2.partition(), 
fileFactory);
+
+    RowDelta rowDelta2 =
+        
table.newRowDelta().addDeletes(conflictingDeletes).addDeletes(conflictingDeletes2);
+
+    commit(table, rowDelta1, branch);
+    commit(table, rowDelta2, branch);
+
+    Set<DeleteFile> dvs =
+        deleteFiles(table, branch).stream()
+            .filter(ContentFileUtil::isDV)
+            .collect(Collectors.toSet());
+    assertThat(dvs).as("There should be exactly 2 DVs").hasSize(2);
+    for (DeleteFile dv : dvs) {
+      PositionDeleteIndex positionDeleteIndex = readDV(table, dv);
+      assertThat(dv.recordCount()).as("The cardinality of the DV should be 
8").isEqualTo(8);
+      for (int i = 0; i < 8; i++) {
+        assertThat(positionDeleteIndex.isDeleted(i))
+            .as("Expected position " + i + " to be deleted")
+            .isTrue();
+      }
+    }
+  }
+
+  private static Set<DeleteFile> deleteFiles(Table table, String ref) {
+    DeleteFileSet deleteFiles = DeleteFileSet.create();
+
+    for (FileScanTask task : table.newScan().useRef(ref).planFiles()) {
+      deleteFiles.addAll(task.deletes());
+    }
+
+    return deleteFiles;
+  }
+
+  private DeleteFile writeDV(
+      List<PositionDelete<?>> deletes, StructLike partition, OutputFileFactory 
fileFactory)
+      throws IOException {
+
+    DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null);
+    try (DVFileWriter closeableWriter = writer) {
+      for (PositionDelete<?> delete : deletes) {
+        closeableWriter.delete(delete.path().toString(), delete.pos(), 
table.spec(), partition);
+      }
+    }
+
+    return Iterables.getOnlyElement(writer.result().deleteFiles());
+  }
+
+  private PositionDeleteIndex readDV(Table table, DeleteFile dv) {

Review Comment:
   Duplicate method, maybe we should add this to `Deletes`? 



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -823,35 +833,138 @@ protected void validateAddedDVs(
             parent);
     List<ManifestFile> newDeleteManifests = history.first();
     Set<Long> newSnapshotIds = history.second();
-
+    Map<String, DeleteFileSet> committedDVsForDataFile = 
Maps.newConcurrentMap();
     Tasks.foreach(newDeleteManifests)
         .stopOnFailure()
         .throwFailureWhenFinished()
         .executeWith(workerPool())
-        .run(manifest -> validateAddedDVs(manifest, conflictDetectionFilter, 
newSnapshotIds));
+        .run(
+            manifest -> {
+              try (CloseableIterable<ManifestEntry<DeleteFile>> entries =
+                  ManifestFiles.readDeleteManifest(
+                          manifest, ops().io(), ops().current().specsById())
+                      .filterRows(conflictDetectionFilter)
+                      .caseSensitive(caseSensitive)
+                      .liveEntries()) {
+                for (ManifestEntry<DeleteFile> entry : entries) {
+                  DeleteFile file = entry.file().copyWithoutStats();
+                  if (newSnapshotIds.contains(entry.snapshotId()) && 
ContentFileUtil.isDV(file)) {
+                    if (newDVRefs.contains(file.referencedDataFile())) {
+                      committedDVsForDataFile
+                          .computeIfAbsent(
+                              file.referencedDataFile(), ignored -> 
DeleteFileSet.create())
+                          .add(file);
+                    }
+                  }
+                }
+              } catch (IOException e) {
+                throw new RuntimeIOException(e);
+              }
+            });
+
+    if (!committedDVsForDataFile.isEmpty()) {
+      mergeConflictingDVs(committedDVsForDataFile);
+    }
   }
 
-  private void validateAddedDVs(
-      ManifestFile manifest, Expression conflictDetectionFilter, Set<Long> 
newSnapshotIds) {
-    try (CloseableIterable<ManifestEntry<DeleteFile>> entries =
-        ManifestFiles.readDeleteManifest(manifest, ops().io(), 
ops().current().specsById())
-            .filterRows(conflictDetectionFilter)
-            .caseSensitive(caseSensitive)
-            .liveEntries()) {
-
-      for (ManifestEntry<DeleteFile> entry : entries) {
-        DeleteFile file = entry.file();
-        if (newSnapshotIds.contains(entry.snapshotId()) && 
ContentFileUtil.isDV(file)) {
-          ValidationException.check(
-              !newDVRefs.contains(file.referencedDataFile()),
-              "Found concurrently added DV for %s: %s",
-              file.referencedDataFile(),
-              ContentFileUtil.dvDesc(file));
-        }
-      }
+  private void mergeConflictingDVs(Map<String, DeleteFileSet> 
committedDVsForDataFile) {
+    Set<MergedDVResult> mergedDVResults = Sets.newConcurrentHashSet();
+    Tasks.foreach(committedDVsForDataFile.entrySet())
+        .executeWith(ThreadPools.getDeleteWorkerPool())
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .run(committedDVs -> 
mergedDVResults.add(mergeAndWriteDV(committedDVs)));
+    // replace the currently conflicting DVs with the merged DV
+    mergedDVResults.forEach(this::replaceConflictingWithMergedDV);
+  }
+
+  private MergedDVResult mergeAndWriteDV(Map.Entry<String, DeleteFileSet> 
committedDVsForDataFile) {
+    String dataFile = committedDVsForDataFile.getKey();
+    DeleteFile committedDV = 
committedDVsForDataFile.getValue().iterator().next();
+    int specId = committedDV.specId();
+    PartitionSpec spec = spec(specId);
+    DeleteFileSet conflictingDVs = dvsWithReferencedDataFile(spec, dataFile);
+    PositionDeleteIndex mergedPositions =
+        mergeConflictingDVs(Iterables.concat(conflictingDVs, 
committedDVsForDataFile.getValue()));
+    try {
+      DVFileWriter dvFileWriter =
+          new BaseDVFileWriter(
+              OutputFileFactory.builderFor(ops(), spec(specId), 
FileFormat.PUFFIN, 1, 1).build(),
+              path -> null);
+      dvFileWriter.delete(dataFile, mergedPositions, spec, 
committedDV.partition());
+      dvFileWriter.close();
+      DeleteWriteResult result = dvFileWriter.result();
+      DeleteFile mergedDV = Iterables.getOnlyElement(result.deleteFiles());
+      return new MergedDVResult(
+          new PendingDeleteFile(mergedDV), committedDVsForDataFile.getValue(), 
conflictingDVs);
     } catch (IOException e) {
-      throw new UncheckedIOException(e);
+      throw new RuntimeIOException(e);

Review Comment:
   UncheckedIOException? 



##########
core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java:
##########
@@ -36,6 +36,14 @@ public interface DVFileWriter extends Closeable {
    */
   void delete(String path, long pos, PartitionSpec spec, StructLike partition);
 
+  default void delete(

Review Comment:
   JavaDoc, also worth explaining the alternative which is calling delete for 
every position.....but I think it's worth introducing an API on the writer 
which takes an in memory index and writes out all the deleted positions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to