jbonofre commented on code in PR #11495:
URL: https://github.com/apache/iceberg/pull/11495#discussion_r1835329015


##########
core/src/main/java/org/apache/iceberg/BaseRowDelta.java:
##########
@@ -139,6 +139,10 @@ protected void validate(TableMetadata base, Snapshot 
parent) {
       if (validateNewDeleteFiles) {
         validateNoNewDeleteFiles(base, startingSnapshotId, 
conflictDetectionFilter, parent);
       }
+
+      if (base.formatVersion() >= 3 && addsDeleteFiles()) {

Review Comment:
   Do we really need the format version here ? Why not just using 
`addsDeleteFiles` flag ? (just wondering).



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -245,13 +250,13 @@ private PartitionSpec spec(int specId) {
 
   /** Add a delete file to the new snapshot. */
   protected void add(DeleteFile file) {
-    Preconditions.checkNotNull(file, "Invalid delete file: null");
+    validateNewDeleteFile(file);
     add(new PendingDeleteFile(file));
   }
 
   /** Add a delete file to the new snapshot. */
   protected void add(DeleteFile file, long dataSequenceNumber) {
-    Preconditions.checkNotNull(file, "Invalid delete file: null");
+    validateNewDeleteFile(file);

Review Comment:
   Thanks @aokolnychyi for the background. It wasn't clear to me the purpose of 
`validateNewDeleteFile()`during `add`.



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -769,6 +794,82 @@ protected void validateDataFilesExist(
     }
   }
 
+  // validates there are no concurrently added DVs for referenced data files
+  protected void validateAddedDVs(
+      TableMetadata base,
+      Long startingSnapshotId,
+      Expression conflictDetectionFilter,
+      Snapshot parent) {
+    // skip if there is no current table state or table format doesn't support 
DVs
+    if (parent == null || base.formatVersion() < 3) {
+      return;
+    }
+
+    // skip if this operation doesn't add new DVs
+    Set<String> dvRefs = dvRefs();
+    if (dvRefs.isEmpty()) {
+      return;
+    }
+
+    Pair<List<ManifestFile>, Set<Long>> history =
+        validationHistory(
+            base,
+            startingSnapshotId,
+            VALIDATE_ADDED_DVS_OPERATIONS,
+            ManifestContent.DELETES,
+            parent);
+    List<ManifestFile> newDeleteManifests = history.first();
+    Set<Long> newSnapshotIds = history.second();
+
+    Tasks.foreach(newDeleteManifests)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(workerPool())
+        .run(m -> validateAddedDVs(m, conflictDetectionFilter, newSnapshotIds, 
dvRefs));

Review Comment:
   For clarity I agree with @amogh-jahagirdar (`manifest` instead of `m`). I'm 
fine to use `DV` instead of `DeleteVector` but `manifest` here would help 😃 



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -271,6 +276,26 @@ private void add(PendingDeleteFile file) {
     }
   }
 
+  protected void validateNewDeleteFile(DeleteFile file) {
+    Preconditions.checkNotNull(file, "Invalid delete file: null");
+    Preconditions.checkArgument(formatVersion() >= 2, "Deletes are supported 
in V2 and above");
+    if (file.content() == FileContent.POSITION_DELETES) {
+      Preconditions.checkArgument(
+          formatVersion() != 2 || !ContentFileUtil.isDV(file),
+          "Must not use DVs for position deletes in V2: %s",
+          ContentFileUtil.dvDesc(file));
+      Preconditions.checkArgument(
+          formatVersion() == 2 || ContentFileUtil.isDV(file),

Review Comment:
   Same question here, why not just checking if it's DV: we know that if it's 
DV it's format version 3+ right ?



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -769,6 +794,82 @@ protected void validateDataFilesExist(
     }
   }
 
+  // validates there are no concurrently added DVs for referenced data files
+  protected void validateAddedDVs(
+      TableMetadata base,
+      Long startingSnapshotId,
+      Expression conflictDetectionFilter,
+      Snapshot parent) {
+    // skip if there is no current table state or table format doesn't support 
DVs
+    if (parent == null || base.formatVersion() < 3) {
+      return;
+    }
+
+    // skip if this operation doesn't add new DVs
+    Set<String> dvRefs = dvRefs();
+    if (dvRefs.isEmpty()) {
+      return;
+    }
+
+    Pair<List<ManifestFile>, Set<Long>> history =
+        validationHistory(
+            base,
+            startingSnapshotId,
+            VALIDATE_ADDED_DVS_OPERATIONS,
+            ManifestContent.DELETES,
+            parent);
+    List<ManifestFile> newDeleteManifests = history.first();
+    Set<Long> newSnapshotIds = history.second();
+
+    Tasks.foreach(newDeleteManifests)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(workerPool())
+        .run(m -> validateAddedDVs(m, conflictDetectionFilter, newSnapshotIds, 
dvRefs));
+  }
+
+  private void validateAddedDVs(
+      ManifestFile manifest,
+      Expression conflictDetectionFilter,
+      Set<Long> newSnapshotIds,
+      Set<String> dvRefs) {
+    try (CloseableIterable<ManifestEntry<DeleteFile>> entries =
+        ManifestFiles.readDeleteManifest(manifest, ops.io(), 
ops.current().specsById())
+            .filterRows(conflictDetectionFilter)
+            .caseSensitive(caseSensitive)
+            .liveEntries()) {
+
+      for (ManifestEntry<DeleteFile> entry : entries) {
+        DeleteFile file = entry.file();
+        if (newSnapshotIds.contains(entry.snapshotId()) && 
ContentFileUtil.isDV(file)) {
+          ValidationException.check(
+              !dvRefs.contains(file.referencedDataFile()),
+              "Found concurrently added DV for %s: %s",
+              file.referencedDataFile(),
+              ContentFileUtil.dvDesc(file));
+        }
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  // builds a set of data file locations referenced by new DVs
+  private Set<String> dvRefs() {

Review Comment:
   nit: it's a private method so name is not a big deal
   
   I think that `dvRefs()` (or `dvReferences()`) makes more sense here as it 
checks if the `deleteFile` is a DV. So, having DV in the method name makes 
sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to