This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 00bac7ae56 FateIT Repo Error Test (#4282) 00bac7ae56 is described below commit 00bac7ae5633147502ce57b28532942323f05030 Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com> AuthorDate: Wed Feb 21 16:03:08 2024 -0500 FateIT Repo Error Test (#4282) --- .../accumulo/test/fate/zookeeper/FateIT.java | 141 +++++++++++++++++++-- 1 file changed, 133 insertions(+), 8 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java index 5e153b21b1..2dde8fabca 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java @@ -36,6 +36,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.io.File; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -115,6 +117,59 @@ public class FateIT { } + public static class TestOperationFails extends ManagerRepo { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(TestOperationFails.class); + private static List<String> undoOrder = new ArrayList<>(); + private static final int TOTAL_NUM_OPS = 3; + private int opNum; + private final String opName; + private final ExceptionLocation location; + + public TestOperationFails(int opNum, ExceptionLocation location) { + this.opNum = opNum; + this.opName = "OP" + opNum; + this.location = location; + } + + @Override + public long isReady(long tid, Manager environment) throws Exception { + LOG.debug("{} {} Entered isReady()", opName, FateTxId.formatTid(tid)); + if (location == ExceptionLocation.IS_READY) { + if (opNum < TOTAL_NUM_OPS) { + return 0; + } else { + throw new Exception( + opName + " " + FateTxId.formatTid(tid) + " isReady() failed - this is expected"); + } + } else { + return 0; + } + } + + @Override + public void undo(long tid, Manager environment) throws Exception { + LOG.debug("{} {} Entered undo()", opName, FateTxId.formatTid(tid)); + undoOrder.add(opName); + undoLatch.countDown(); + } + + @Override + public Repo<Manager> call(long tid, Manager environment) throws Exception { + LOG.debug("{} {} Entered call()", opName, FateTxId.formatTid(tid)); + if (location == ExceptionLocation.CALL) { + if (opNum < TOTAL_NUM_OPS) { + return new TestOperationFails(++opNum, location); + } else { + throw new Exception( + opName + " " + FateTxId.formatTid(tid) + " call() failed - this is expected"); + } + } else { + return new TestOperationFails(++opNum, location); + } + } + } + private static final Logger LOG = LoggerFactory.getLogger(FateIT.class); @TempDir @@ -128,6 +183,11 @@ public class FateIT { private static CountDownLatch callStarted; private static CountDownLatch finishCall; + private static CountDownLatch undoLatch; + + private enum ExceptionLocation { + CALL, IS_READY + }; @BeforeAll public static void setup() throws Exception { @@ -165,10 +225,6 @@ public class FateIT { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - fate.startTransactionRunners(config); - - // Wait for the transaction runner to be scheduled. - UtilWaitThread.sleep(3000); callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@ -177,6 +233,11 @@ public class FateIT { assertEquals(TStatus.NEW, getTxStatus(zk, txid)); fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid)); + + fate.startTransactionRunners(config); + // Wait for the transaction runner to be scheduled. + UtilWaitThread.sleep(3000); + // wait for call() to be called callStarted.await(); assertEquals(IN_PROGRESS, getTxStatus(zk, txid)); @@ -346,10 +407,6 @@ public class FateIT { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - fate.startTransactionRunners(config); - - // Wait for the transaction runner to be scheduled. - UtilWaitThread.sleep(3000); callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@ -359,6 +416,11 @@ public class FateIT { assertEquals(NEW, getTxStatus(zk, txid)); fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); assertEquals(SUBMITTED, getTxStatus(zk, txid)); + + fate.startTransactionRunners(config); + // Wait for the transaction runner to be scheduled. + UtilWaitThread.sleep(3000); + // wait for call() to be called callStarted.await(); // cancel the transaction @@ -369,6 +431,69 @@ public class FateIT { } + @Test + public void testRepoFails() throws Exception { + /* + * This test ensures that when an exception occurs in a Repo's call() or isReady() methods, that + * undo() will be called back up the chain of Repo's and in the correct order. The test works as + * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and returns Repo3, 3) Repo3 + * is called and throws an exception (in call() or isReady()). It is then expected that: 1) + * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is called on Repo1 + */ + final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); + final AgeOffStore<Manager> store = + new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); + + Manager manager = createMock(Manager.class); + ServerContext sctx = createMock(ServerContext.class); + expect(manager.getContext()).andReturn(sctx).anyTimes(); + expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); + expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); + replay(manager, sctx); + + Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString); + try { + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); + fate.startTransactionRunners(config); + + // Wait for the transaction runner to be scheduled. + UtilWaitThread.sleep(3000); + + List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1"); + /* + * Test exception in call() + */ + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); + long txid = fate.startTransaction(); + assertEquals(NEW, getTxStatus(zk, txid)); + fate.seedTransaction("TestOperationFails", txid, + new TestOperationFails(1, ExceptionLocation.CALL), false, "Test Op Fails"); + // Wait for all the undo() calls to complete + undoLatch.await(); + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); + assertEquals(FAILED, fate.waitForCompletion(txid)); + assertTrue(fate.getException(txid).getMessage().contains("call() failed")); + /* + * Test exception in isReady() + */ + TestOperationFails.undoOrder = new ArrayList<>(); + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); + txid = fate.startTransaction(); + assertEquals(NEW, getTxStatus(zk, txid)); + fate.seedTransaction("TestOperationFails", txid, + new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test Op Fails"); + // Wait for all the undo() calls to complete + undoLatch.await(); + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); + assertEquals(FAILED, fate.waitForCompletion(txid)); + assertTrue(fate.getException(txid).getMessage().contains("isReady() failed")); + } finally { + fate.shutdown(); + } + } + private static void inCall() throws InterruptedException { // signal that call started callStarted.countDown();