amogh-jahagirdar commented on code in PR #11693:
URL: https://github.com/apache/iceberg/pull/11693#discussion_r1868322612


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -823,35 +833,138 @@ protected void validateAddedDVs(
             parent);
     List<ManifestFile> newDeleteManifests = history.first();
     Set<Long> newSnapshotIds = history.second();
-
+    Map<String, DeleteFileSet> committedDVsForDataFile = 
Maps.newConcurrentMap();
     Tasks.foreach(newDeleteManifests)
         .stopOnFailure()
         .throwFailureWhenFinished()
         .executeWith(workerPool())
-        .run(manifest -> validateAddedDVs(manifest, conflictDetectionFilter, 
newSnapshotIds));
+        .run(
+            manifest -> {
+              try (CloseableIterable<ManifestEntry<DeleteFile>> entries =
+                  ManifestFiles.readDeleteManifest(
+                          manifest, ops().io(), ops().current().specsById())
+                      .filterRows(conflictDetectionFilter)
+                      .caseSensitive(caseSensitive)
+                      .liveEntries()) {
+                for (ManifestEntry<DeleteFile> entry : entries) {
+                  DeleteFile file = entry.file().copyWithoutStats();
+                  if (newSnapshotIds.contains(entry.snapshotId()) && 
ContentFileUtil.isDV(file)) {
+                    if (newDVRefs.contains(file.referencedDataFile())) {
+                      committedDVsForDataFile
+                          .computeIfAbsent(
+                              file.referencedDataFile(), ignored -> 
DeleteFileSet.create())
+                          .add(file);
+                    }
+                  }
+                }
+              } catch (IOException e) {
+                throw new RuntimeIOException(e);
+              }
+            });
+
+    if (!committedDVsForDataFile.isEmpty()) {
+      mergeConflictingDVs(committedDVsForDataFile);
+    }
   }
 
-  private void validateAddedDVs(
-      ManifestFile manifest, Expression conflictDetectionFilter, Set<Long> 
newSnapshotIds) {
-    try (CloseableIterable<ManifestEntry<DeleteFile>> entries =
-        ManifestFiles.readDeleteManifest(manifest, ops().io(), 
ops().current().specsById())
-            .filterRows(conflictDetectionFilter)
-            .caseSensitive(caseSensitive)
-            .liveEntries()) {
-
-      for (ManifestEntry<DeleteFile> entry : entries) {
-        DeleteFile file = entry.file();
-        if (newSnapshotIds.contains(entry.snapshotId()) && 
ContentFileUtil.isDV(file)) {
-          ValidationException.check(
-              !newDVRefs.contains(file.referencedDataFile()),
-              "Found concurrently added DV for %s: %s",
-              file.referencedDataFile(),
-              ContentFileUtil.dvDesc(file));
-        }
-      }
+  private void mergeConflictingDVs(Map<String, DeleteFileSet> 
committedDVsForDataFile) {
+    Set<MergedDVResult> mergedDVResults = Sets.newConcurrentHashSet();
+    Tasks.foreach(committedDVsForDataFile.entrySet())
+        .executeWith(ThreadPools.getDeleteWorkerPool())
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .run(committedDVs -> 
mergedDVResults.add(mergeAndWriteDV(committedDVs)));
+    // replace the currently conflicting DVs with the merged DV
+    mergedDVResults.forEach(this::replaceConflictingWithMergedDV);
+  }
+
+  private MergedDVResult mergeAndWriteDV(Map.Entry<String, DeleteFileSet> 
committedDVsForDataFile) {
+    String dataFile = committedDVsForDataFile.getKey();
+    DeleteFile committedDV = 
committedDVsForDataFile.getValue().iterator().next();
+    int specId = committedDV.specId();
+    PartitionSpec spec = spec(specId);
+    DeleteFileSet conflictingDVs = dvsWithReferencedDataFile(spec, dataFile);
+    PositionDeleteIndex mergedPositions =
+        mergeConflictingDVs(Iterables.concat(conflictingDVs, 
committedDVsForDataFile.getValue()));
+    try {
+      DVFileWriter dvFileWriter =
+          new BaseDVFileWriter(
+              OutputFileFactory.builderFor(ops(), spec(specId), 
FileFormat.PUFFIN, 1, 1).build(),
+              path -> null);
+      dvFileWriter.delete(dataFile, mergedPositions, spec, 
committedDV.partition());
+      dvFileWriter.close();
+      DeleteWriteResult result = dvFileWriter.result();
+      DeleteFile mergedDV = Iterables.getOnlyElement(result.deleteFiles());
+      return new MergedDVResult(
+          new PendingDeleteFile(mergedDV), committedDVsForDataFile.getValue(), 
conflictingDVs);
     } catch (IOException e) {
-      throw new UncheckedIOException(e);
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  private static class MergedDVResult {
+    private final DeleteFile mergedDV;
+    private final Set<DeleteFile> committedDVs;
+    private final Set<DeleteFile> conflictingDVs;
+
+    MergedDVResult(
+        DeleteFile mergedDV, Set<DeleteFile> committedDVs, Set<DeleteFile> 
conflictingDVs) {
+      this.mergedDV = mergedDV;
+      this.committedDVs = committedDVs;
+      this.conflictingDVs = conflictingDVs;
+    }
+
+    public DeleteFile mergedDV() {
+      return mergedDV;
     }
+
+    public Set<DeleteFile> committedDVs() {
+      return committedDVs;
+    }
+
+    public Set<DeleteFile> conflictingDVs() {
+      return conflictingDVs;
+    }
+  }
+
+  private PositionDeleteIndex mergeConflictingDVs(Iterable<DeleteFile> 
conflictingDVs) {
+    Iterator<DeleteFile> confictingDVIterator = conflictingDVs.iterator();
+    PositionDeleteIndex mergedPositions = readDV(confictingDVIterator.next());
+    confictingDVIterator.forEachRemaining(dv -> 
mergedPositions.merge(readDV(dv)));
+    return mergedPositions;
+  }
+
+  private void replaceConflictingWithMergedDV(MergedDVResult mergedDVResult) {
+    DeleteFile mergedDV = mergedDVResult.mergedDV();
+    Set<DeleteFile> conflictingDVs = mergedDVResult.conflictingDVs();
+    // Remove the committed DVs from metadata
+    mergedDVResult.committedDVs().forEach(this::delete);
+    DeleteFileSet deleteFilesForSpec = 
newDeleteFilesBySpec.get(mergedDV.specId());
+    // Remove the pending conflicting DVs
+    deleteFilesForSpec.removeAll(conflictingDVs);
+    // Add the merged DV
+    deleteFilesForSpec.add(mergedDV);
+  }
+
+  private PositionDeleteIndex readDV(DeleteFile dv) {
+    LOG.trace("Opening DV file {}", dv.location());
+    InputFile inputFile = EncryptingFileIO.combine(ops().io(), 
ops().encryption()).newInputFile(dv);
+    long offset = dv.contentOffset();
+    int length = dv.contentSizeInBytes().intValue();
+    byte[] bytes = IOUtil.readBytes(inputFile, offset, length);
+    return PositionDeleteIndex.deserialize(bytes, dv);
+  }

Review Comment:
   Couldn't use DeleteLoader since it's in iceberg-data, I think it maybe worth 
exposing this API on the Deletes class? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to