anoopj commented on code in PR #15653:
URL: https://github.com/apache/iceberg/pull/15653#discussion_r3001794037


##########
core/src/test/java/org/apache/iceberg/TestRowDelta.java:
##########
@@ -2187,6 +2189,133 @@ public void testDuplicateDVsAreMergedAndEqDelete() 
throws IOException {
     
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
   }
 
+  @TestTemplate
+  public void testConcurrentDVsInDifferentPartitionsWithFilter() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    // bucket16("u") -> 0, bucket16("a") -> 2
+    DataFile dataFileInBucket0 = newDataFile("data_bucket=0");
+    DataFile dataFileInBucket2 = newDataFile("data_bucket=2");
+    commit(
+        table, 
table.newRowDelta().addRows(dataFileInBucket0).addRows(dataFileInBucket2), 
branch);
+
+    Snapshot base = latestSnapshot(table, branch);
+
+    // prepare a DV for bucket 0 with a conflict detection filter scoped to 
bucket 0
+    DeleteFile dvBucket0 = newDeletes(dataFileInBucket0);
+    RowDelta rowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(dvBucket0)
+            .validateFromSnapshot(base.snapshotId())
+            .conflictDetectionFilter(Expressions.equal("data", "u")); // 
bucket16("u") -> 0
+
+    // concurrently commit a DV in bucket 2
+    DeleteFile dvBucket2 = newDeletes(dataFileInBucket2);
+    commit(table, table.newRowDelta().addDeletes(dvBucket2), branch);
+
+    // commit should succeed because the concurrent DV is in bucket 2
+    // which does not overlap the conflict detection filter
+    commit(table, rowDelta, branch);

Review Comment:
   Good idea. Done. 



##########
core/src/test/java/org/apache/iceberg/TestRowDelta.java:
##########
@@ -2187,6 +2189,133 @@ public void testDuplicateDVsAreMergedAndEqDelete() 
throws IOException {
     
assertThat(committedEqDelete.content()).isEqualTo(FileContent.EQUALITY_DELETES);
   }
 
+  @TestTemplate
+  public void testConcurrentDVsInDifferentPartitionsWithFilter() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    // bucket16("u") -> 0, bucket16("a") -> 2
+    DataFile dataFileInBucket0 = newDataFile("data_bucket=0");
+    DataFile dataFileInBucket2 = newDataFile("data_bucket=2");
+    commit(
+        table, 
table.newRowDelta().addRows(dataFileInBucket0).addRows(dataFileInBucket2), 
branch);
+
+    Snapshot base = latestSnapshot(table, branch);
+
+    // prepare a DV for bucket 0 with a conflict detection filter scoped to 
bucket 0
+    DeleteFile dvBucket0 = newDeletes(dataFileInBucket0);
+    RowDelta rowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(dvBucket0)
+            .validateFromSnapshot(base.snapshotId())
+            .conflictDetectionFilter(Expressions.equal("data", "u")); // 
bucket16("u") -> 0
+
+    // concurrently commit a DV in bucket 2
+    DeleteFile dvBucket2 = newDeletes(dataFileInBucket2);
+    commit(table, table.newRowDelta().addDeletes(dvBucket2), branch);
+
+    // commit should succeed because the concurrent DV is in bucket 2
+    // which does not overlap the conflict detection filter
+    commit(table, rowDelta, branch);
+  }
+
+  @TestTemplate
+  public void testConcurrentDVsInSamePartitionWithFilter() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    // bucket16("u") -> 0
+    DataFile dataFile = newDataFile("data_bucket=0");
+    commit(table, table.newRowDelta().addRows(dataFile), branch);
+
+    Snapshot base = latestSnapshot(table, branch);
+
+    // prepare a DV for dataFile with a conflict detection filter scoped to 
bucket 0
+    DeleteFile dv1 = newDeletes(dataFile);
+    RowDelta rowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(dv1)
+            .validateFromSnapshot(base.snapshotId())
+            .conflictDetectionFilter(Expressions.equal("data", "u")); // 
bucket16("u") -> 0
+
+    // concurrently commit another DV for the same data file in bucket 0
+    DeleteFile dv2 = newDeletes(dataFile);
+    commit(table, table.newRowDelta().addDeletes(dv2), branch);
+
+    // must be conflict because the concurrent DV is in the same partition
+    assertThatThrownBy(() -> commit(table, rowDelta, branch))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Found concurrently added DV for %s", 
dataFile.location());
+  }
+
+  @TestTemplate
+  public void testDVValidationPartitionPruningManifestCount() {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    // disable manifest merging so each commit produces a separate delete 
manifest
+    table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, 
"false").commit();
+
+    // create data files and DVs across 10 different partitions (buckets 0-9)
+    int numPartitions = 10;
+    DataFile[] dataFiles = new DataFile[numPartitions];
+    for (int bucket = 0; bucket < numPartitions; bucket++) {
+      dataFiles[bucket] = newDataFile("data_bucket=" + bucket);
+      commit(table, table.newRowDelta().addRows(dataFiles[bucket]), branch);
+      DeleteFile dv = newDeletes(dataFiles[bucket]);
+      commit(table, table.newRowDelta().addDeletes(dv), branch);
+    }
+
+    Snapshot base = latestSnapshot(table, branch);
+    List<ManifestFile> deleteManifests = base.deleteManifests(table.io());
+
+    // there should be one delete manifest per partition since we disabled 
merging
+    assertThat(deleteManifests).hasSizeGreaterThanOrEqualTo(numPartitions);
+
+    // count how many manifests match a filter scoped to bucket 0
+    // bucket16("u") -> 0
+    Expression filter = Expressions.equal("data", "u");
+    int matching = 0;
+    for (ManifestFile manifest : deleteManifests) {
+      PartitionSpec spec = table.specs().get(manifest.partitionSpecId());
+      Expression partitionFilter = Projections.inclusive(spec, 
true).project(filter);
+      ManifestEvaluator evaluator =
+          ManifestEvaluator.forPartitionFilter(partitionFilter, spec, true);
+      if (evaluator.eval(manifest)) {
+        matching = matching + 1;
+      }
+    }
+
+    // only 1 out of N manifests should match the filter for bucket 0
+    assertThat(matching).isEqualTo(1);
+    assertThat(deleteManifests.size() - matching)
+        .as("pruned manifests")
+        .isGreaterThanOrEqualTo(numPartitions - 1);
+
+    // verify the DV manifest pruning works: commit a new DV in bucket 0
+    // while concurrent DVs exist in all other partitions
+    DataFile newDataFileInBucket0 = newDataFile("data_bucket=0");
+    commit(table, table.newRowDelta().addRows(newDataFileInBucket0), branch);
+
+    Snapshot preCommit = latestSnapshot(table, branch);
+    DeleteFile newDV = newDeletes(newDataFileInBucket0);
+    RowDelta rowDelta =
+        table
+            .newRowDelta()
+            .addDeletes(newDV)
+            .validateFromSnapshot(preCommit.snapshotId())
+            .conflictDetectionFilter(Expressions.equal("data", "u")); // 
bucket16("u") -> 0
+
+    // concurrently add a DV in a different partition (bucket 5)
+    // bucket16("v") -> 5
+    DataFile newDataFileInBucket5 = newDataFile("data_bucket=5");
+    commit(table, table.newRowDelta().addRows(newDataFileInBucket5), branch);
+    DeleteFile concurrentDV = newDeletes(newDataFileInBucket5);
+    commit(table, table.newRowDelta().addDeletes(concurrentDV), branch);
+
+    // commit should succeed: the concurrent DV is in bucket 5, pruned by the 
filter
+    commit(table, rowDelta, branch);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to