fqaiser94 commented on code in PR #6513: URL: https://github.com/apache/iceberg/pull/6513#discussion_r1142716546
########## core/src/test/java/org/apache/iceberg/TestCustomValidations.java: ########## @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestCustomValidations extends TableTestBase { + + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] {1, 2}; + } + + public TestCustomValidations(int formatVersion) { + super(formatVersion); + } + + private <E> E setupTableAndEnv(Supplier<E> setupEnv) throws Exception { + cleanupTables(); + setupTable(); + return setupEnv.get(); + } + + private <E, T> void testValidationPasses( + Supplier<E> setupEnv, + Function<E, PendingUpdate<T>> pendingUpdateSupplier, + Consumer<E> assertSuccess) + throws Exception { + E env = setupTableAndEnv(setupEnv); + + PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env); + pendingUpdate.validateCurrentTable(currentTable -> true, "Never fails."); + pendingUpdate.commit(); + + assertSuccess.accept(env); + } + + private <E, T> void testValidationFails( + Supplier<E> setupEnv, + Function<E, PendingUpdate<T>> pendingUpdateSupplier, + Consumer<E> assertFailure) + throws Exception { + E env = setupTableAndEnv(setupEnv); + + AssertHelpers.assertThrows( + "Should throw a ValidationException if the given predicate returns false", + ValidationException.class, + "Test returned: false", + () -> { + PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env); + pendingUpdate.validateCurrentTable(currentTable -> false, "Test returned: %b", false); + pendingUpdate.commit(); + }); + + assertFailure.accept(env); + } + + private <E, T> void testFirstValidationFails( + Supplier<E> setupEnv, + Function<E, PendingUpdate<T>> pendingUpdateSupplier, + Consumer<E> assertFailure) + throws Exception { + E env = setupTableAndEnv(setupEnv); + + AssertHelpers.assertThrows( + "Should throw a ValidationException if the first predicate returns false", + ValidationException.class, + "First test returned: false", + () -> { + PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env); + pendingUpdate.validateCurrentTable( + currentTable -> false, "First test returned: %b", false); + pendingUpdate.validateCurrentTable( + currentTable -> true, "Second test returned: %b", true); + pendingUpdate.commit(); + }); + + assertFailure.accept(env); + } + + private <E, T> void testSecondValidationFails( + Supplier<E> setupEnv, + Function<E, PendingUpdate<T>> pendingUpdateSupplier, + Consumer<E> assertFailure) + throws Exception { + E env = setupTableAndEnv(setupEnv); + + AssertHelpers.assertThrows( + "Should throw a ValidationException if the second predicate returns false", + ValidationException.class, + "Second test returned: false", + () -> { + PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env); + pendingUpdate.validateCurrentTable(currentTable -> true, "First test returned: %b", true); + pendingUpdate.validateCurrentTable( + currentTable -> false, "Second test returned: %b", false); + pendingUpdate.commit(); + }); + + assertFailure.accept(env); + } + + private <E, T> void testValidationFailsDueToConcurrentCommit( + Supplier<E> setupEnv, + Function<E, PendingUpdate<T>> pendingUpdateSupplier, + Consumer<E> assertFailure) + throws Exception { + E env = setupTableAndEnv(setupEnv); + + String customWatermarkKey = "custom_watermark"; + String currentCustomWatermarkValue = "1"; + String nextCustomWatermarkValue = "2"; + table.updateProperties().set(customWatermarkKey, currentCustomWatermarkValue).commit(); + + PendingUpdate<T> uncommitted = pendingUpdateSupplier.apply(env); + String failMessage = "Test failed"; + uncommitted.validateCurrentTable( + currentTable -> + Objects.equals( + currentTable.properties().get(customWatermarkKey), currentCustomWatermarkValue), + failMessage); + + // concurrent update to the table which advances our custom_watermark value + table.updateProperties().set(customWatermarkKey, nextCustomWatermarkValue).commit(); + + if (uncommitted instanceof UpdateSchema + || uncommitted instanceof UpdatePartitionSpec + || uncommitted instanceof UpdateSnapshotReferencesOperation) { + // The implementations of the above interfaces do not refresh to get the latest + // TableMetadata before calling the underlying table's commit method. + // As a result, no ValidationException is thrown because they do not see the concurrent + // modifications until the underlying table's commit method is called which is when they + // detect the TableMetadata is out-of-date and the commit attempt fails at that point. + // Either way, we are able to ensure that we never commit to the table unless we are assured + // that the validations hold for the current version of the table. + AssertHelpers.assertThrows( + "Should throw a CommitFailedException on commit due to concurrent update causing metadata to become stale.", + CommitFailedException.class, + "Cannot commit changes based on stale metadata", + uncommitted::commit); + } else { + AssertHelpers.assertThrows( + "Should throw a ValidationException on commit due to concurrent update causing the given predicate to return false", + ValidationException.class, + failMessage, + uncommitted::commit); + } + + assertFailure.accept(env); + } + + private <E, T> void testModifyingTableInsideValidationThrowsException( + Supplier<E> setupEnv, + Function<E, PendingUpdate<T>> pendingUpdateSupplier, + Consumer<E> assertFailure) + throws Exception { + E env = setupTableAndEnv(setupEnv); + + AssertHelpers.assertThrows( + "Any attempts to modify a table inside a validation should throw an exception", + java.lang.UnsupportedOperationException.class, + "Cannot modify a static table", + () -> { + PendingUpdate<T> pendingUpdate = pendingUpdateSupplier.apply(env); + pendingUpdate.validateCurrentTable( + currentTable -> { + // illegal action + currentTable.updateProperties().set("custom_watermark", "2").commit(); + return true; + }, + "Test failed."); + pendingUpdate.commit(); + }); + + assertFailure.accept(env); + } + + private <E, T> void testValidationBehaviours( + Supplier<E> setup, + Function<E, PendingUpdate<T>> pendingUpdateSupplier, + Consumer<E> assertSuccess, + Consumer<E> assertFailure) + throws Exception { + testValidationPasses(setup, pendingUpdateSupplier, assertSuccess); + testValidationFails(setup, pendingUpdateSupplier, assertFailure); + testFirstValidationFails(setup, pendingUpdateSupplier, assertFailure); + testSecondValidationFails(setup, pendingUpdateSupplier, assertFailure); + testValidationFailsDueToConcurrentCommit(setup, pendingUpdateSupplier, assertFailure); + testModifyingTableInsideValidationThrowsException(setup, pendingUpdateSupplier, assertFailure); + } + + private <T> void testValidationBehaviours( + Supplier<PendingUpdate<T>> pendingUpdateSupplier, + Runnable assertSuccess, + Runnable assertFailure) + throws Exception { + testValidationBehaviours( + () -> null, + (__) -> pendingUpdateSupplier.get(), + (__) -> assertSuccess.run(), + (__) -> assertFailure.run()); + } + + @Test + public void testCherryPickOperation() throws Exception { + class Setup { + final long firstSnapshotId; + final long overwriteSnapshotId; + + Setup(long firstSnapshotId, long overwriteSnapshotId) { + this.firstSnapshotId = firstSnapshotId; + this.overwriteSnapshotId = overwriteSnapshotId; + } + } + + testValidationBehaviours( Review Comment: I appreciate the politely-worded comment but I am going to push back a little here. In general, I agree with having clear and readable tests over deduplicated code and in fact, I originally wrote the tests without any "deduplication" but quickly found that maintaining them was a massive burden. I "deduplicated" the code to ease this and didn't personally feel the readability was compromised that badly but naturally, as the author of the code I have obvious bias :D I'll leave this as unresolved for now to see if anyone else finds the tests extremely unreadable, in which case I'll can make the requested changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org