fqaiser94 commented on code in PR #6513:
URL: https://github.com/apache/iceberg/pull/6513#discussion_r1623751172


##########
core/src/test/java/org/apache/iceberg/TestCustomValidations.java:
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestCustomValidations extends TableTestBase {
+
+  @Parameterized.Parameters(name = "formatVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] {1, 2};
+  }
+
+  public TestCustomValidations(int formatVersion) {
+    super(formatVersion);
+  }
+
+  private <E> E setupTableAndEnv(Supplier<E> setupEnv) throws Exception {
+    cleanupTables();
+    setupTable();
+    return setupEnv.get();
+  }
+
+  private <E, T> void testValidationPasses(
+      Supplier<E> setupEnv,
+      Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+      Consumer<E> assertSuccess)
+      throws Exception {
+    E env = setupTableAndEnv(setupEnv);
+
+    PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+    pendingUpdate.validateCurrentTable(currentTable -> true, "Never fails.");
+    pendingUpdate.commit();
+
+    assertSuccess.accept(env);
+  }
+
+  private <E, T> void testValidationFails(
+      Supplier<E> setupEnv,
+      Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+      Consumer<E> assertFailure)
+      throws Exception {
+    E env = setupTableAndEnv(setupEnv);
+
+    AssertHelpers.assertThrows(
+        "Should throw a ValidationException if the given predicate returns 
false",
+        ValidationException.class,
+        "Test returned: false",
+        () -> {
+          PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+          pendingUpdate.validateCurrentTable(currentTable -> false, "Test 
returned: %b", false);
+          pendingUpdate.commit();
+        });
+
+    assertFailure.accept(env);
+  }
+
+  private <E, T> void testFirstValidationFails(
+      Supplier<E> setupEnv,
+      Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+      Consumer<E> assertFailure)
+      throws Exception {
+    E env = setupTableAndEnv(setupEnv);
+
+    AssertHelpers.assertThrows(
+        "Should throw a ValidationException if the first predicate returns 
false",
+        ValidationException.class,
+        "First test returned: false",
+        () -> {
+          PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+          pendingUpdate.validateCurrentTable(
+              currentTable -> false, "First test returned: %b", false);
+          pendingUpdate.validateCurrentTable(
+              currentTable -> true, "Second test returned: %b", true);
+          pendingUpdate.commit();
+        });
+
+    assertFailure.accept(env);
+  }
+
+  private <E, T> void testSecondValidationFails(
+      Supplier<E> setupEnv,
+      Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+      Consumer<E> assertFailure)
+      throws Exception {
+    E env = setupTableAndEnv(setupEnv);
+
+    AssertHelpers.assertThrows(
+        "Should throw a ValidationException if the second predicate returns 
false",
+        ValidationException.class,
+        "Second test returned: false",
+        () -> {
+          PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+          pendingUpdate.validateCurrentTable(currentTable -> true, "First test 
returned: %b", true);
+          pendingUpdate.validateCurrentTable(
+              currentTable -> false, "Second test returned: %b", false);
+          pendingUpdate.commit();
+        });
+
+    assertFailure.accept(env);
+  }
+
+  private <E, T> void testValidationFailsDueToConcurrentCommit(
+      Supplier<E> setupEnv,
+      Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+      Consumer<E> assertFailure)
+      throws Exception {
+    E env = setupTableAndEnv(setupEnv);
+
+    String customWatermarkKey = "custom_watermark";
+    String currentCustomWatermarkValue = "1";
+    String nextCustomWatermarkValue = "2";
+    table.updateProperties().set(customWatermarkKey, 
currentCustomWatermarkValue).commit();
+
+    PendingUpdate<T> uncommitted = pendingUpdateSupplier.apply(env);
+    String failMessage = "Test failed";
+    uncommitted.validateCurrentTable(
+        currentTable ->
+            Objects.equals(
+                currentTable.properties().get(customWatermarkKey), 
currentCustomWatermarkValue),
+        failMessage);
+
+    // concurrent update to the table which advances our custom_watermark value
+    table.updateProperties().set(customWatermarkKey, 
nextCustomWatermarkValue).commit();
+
+    if (uncommitted instanceof UpdateSchema
+        || uncommitted instanceof UpdatePartitionSpec
+        || uncommitted instanceof UpdateSnapshotReferencesOperation) {
+      // The implementations of the above interfaces do not refresh to get the 
latest
+      // TableMetadata before calling the underlying table's commit method.
+      // As a result, no ValidationException is thrown because they do not see 
the concurrent
+      // modifications until the underlying table's commit method is called 
which is when they
+      // detect the TableMetadata is out-of-date and the commit attempt fails 
at that point.
+      // Either way, we are able to ensure that we never commit to the table 
unless we are assured
+      // that the validations hold for the current version of the table.
+      AssertHelpers.assertThrows(
+          "Should throw a CommitFailedException on commit due to concurrent 
update causing metadata to become stale.",
+          CommitFailedException.class,
+          "Cannot commit changes based on stale metadata",
+          uncommitted::commit);
+    } else {
+      AssertHelpers.assertThrows(
+          "Should throw a ValidationException on commit due to concurrent 
update causing the given predicate to return false",
+          ValidationException.class,
+          failMessage,
+          uncommitted::commit);
+    }
+
+    assertFailure.accept(env);
+  }
+
+  private <E, T> void testModifyingTableInsideValidationThrowsException(
+      Supplier<E> setupEnv,
+      Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+      Consumer<E> assertFailure)
+      throws Exception {
+    E env = setupTableAndEnv(setupEnv);
+
+    AssertHelpers.assertThrows(
+        "Any attempts to modify a table inside a validation should throw an 
exception",
+        java.lang.UnsupportedOperationException.class,
+        "Cannot modify a static table",
+        () -> {
+          PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env);
+          pendingUpdate.validateCurrentTable(
+              currentTable -> {
+                // illegal action
+                currentTable.updateProperties().set("custom_watermark", 
"2").commit();
+                return true;
+              },
+              "Test failed.");
+          pendingUpdate.commit();
+        });
+
+    assertFailure.accept(env);
+  }
+
+  private <E, T> void testValidationBehaviours(
+      Supplier<E> setup,
+      Function<E, PendingUpdate<T>> pendingUpdateSupplier,
+      Consumer<E> assertSuccess,
+      Consumer<E> assertFailure)
+      throws Exception {
+    testValidationPasses(setup, pendingUpdateSupplier, assertSuccess);
+    testValidationFails(setup, pendingUpdateSupplier, assertFailure);
+    testFirstValidationFails(setup, pendingUpdateSupplier, assertFailure);
+    testSecondValidationFails(setup, pendingUpdateSupplier, assertFailure);
+    testValidationFailsDueToConcurrentCommit(setup, pendingUpdateSupplier, 
assertFailure);
+    testModifyingTableInsideValidationThrowsException(setup, 
pendingUpdateSupplier, assertFailure);
+  }
+
+  private <T> void testValidationBehaviours(
+      Supplier<PendingUpdate<T>> pendingUpdateSupplier,
+      Runnable assertSuccess,
+      Runnable assertFailure)
+      throws Exception {
+    testValidationBehaviours(
+        () -> null,
+        (__) -> pendingUpdateSupplier.get(),
+        (__) -> assertSuccess.run(),
+        (__) -> assertFailure.run());
+  }
+
+  @Test
+  public void testCherryPickOperation() throws Exception {
+    class Setup {
+      final long firstSnapshotId;
+      final long overwriteSnapshotId;
+
+      Setup(long firstSnapshotId, long overwriteSnapshotId) {
+        this.firstSnapshotId = firstSnapshotId;
+        this.overwriteSnapshotId = overwriteSnapshotId;
+      }
+    }
+
+    testValidationBehaviours(

Review Comment:
   @nastra I've gotten rid of all the lambdas and written separate tests for 
each case now 😄



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to