This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 00bac7ae56 FateIT Repo Error Test (#4282)
00bac7ae56 is described below

commit 00bac7ae5633147502ce57b28532942323f05030
Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com>
AuthorDate: Wed Feb 21 16:03:08 2024 -0500

    FateIT Repo Error Test (#4282)
---
 .../accumulo/test/fate/zookeeper/FateIT.java       | 141 +++++++++++++++++++--
 1 file changed, 133 insertions(+), 8 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index 5e153b21b1..2dde8fabca 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@ -36,6 +36,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 
@@ -115,6 +117,59 @@ public class FateIT {
 
   }
 
+  public static class TestOperationFails extends ManagerRepo {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestOperationFails.class);
+    private static List<String> undoOrder = new ArrayList<>();
+    private static final int TOTAL_NUM_OPS = 3;
+    private int opNum;
+    private final String opName;
+    private final ExceptionLocation location;
+
+    public TestOperationFails(int opNum, ExceptionLocation location) {
+      this.opNum = opNum;
+      this.opName = "OP" + opNum;
+      this.location = location;
+    }
+
+    @Override
+    public long isReady(long tid, Manager environment) throws Exception {
+      LOG.debug("{} {} Entered isReady()", opName, FateTxId.formatTid(tid));
+      if (location == ExceptionLocation.IS_READY) {
+        if (opNum < TOTAL_NUM_OPS) {
+          return 0;
+        } else {
+          throw new Exception(
+              opName + " " + FateTxId.formatTid(tid) + " isReady() failed - 
this is expected");
+        }
+      } else {
+        return 0;
+      }
+    }
+
+    @Override
+    public void undo(long tid, Manager environment) throws Exception {
+      LOG.debug("{} {} Entered undo()", opName, FateTxId.formatTid(tid));
+      undoOrder.add(opName);
+      undoLatch.countDown();
+    }
+
+    @Override
+    public Repo<Manager> call(long tid, Manager environment) throws Exception {
+      LOG.debug("{} {} Entered call()", opName, FateTxId.formatTid(tid));
+      if (location == ExceptionLocation.CALL) {
+        if (opNum < TOTAL_NUM_OPS) {
+          return new TestOperationFails(++opNum, location);
+        } else {
+          throw new Exception(
+              opName + " " + FateTxId.formatTid(tid) + " call() failed - this 
is expected");
+        }
+      } else {
+        return new TestOperationFails(++opNum, location);
+      }
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(FateIT.class);
 
   @TempDir
@@ -128,6 +183,11 @@ public class FateIT {
 
   private static CountDownLatch callStarted;
   private static CountDownLatch finishCall;
+  private static CountDownLatch undoLatch;
+
+  private enum ExceptionLocation {
+    CALL, IS_READY
+  };
 
   @BeforeAll
   public static void setup() throws Exception {
@@ -165,10 +225,6 @@ public class FateIT {
       ConfigurationCopy config = new ConfigurationCopy();
       config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
       config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-      fate.startTransactionRunners(config);
-
-      // Wait for the transaction runner to be scheduled.
-      UtilWaitThread.sleep(3000);
 
       callStarted = new CountDownLatch(1);
       finishCall = new CountDownLatch(1);
@@ -177,6 +233,11 @@ public class FateIT {
       assertEquals(TStatus.NEW, getTxStatus(zk, txid));
       fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
       assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
+
+      fate.startTransactionRunners(config);
+      // Wait for the transaction runner to be scheduled.
+      UtilWaitThread.sleep(3000);
+
       // wait for call() to be called
       callStarted.await();
       assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
@@ -346,10 +407,6 @@ public class FateIT {
       ConfigurationCopy config = new ConfigurationCopy();
       config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
       config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-      fate.startTransactionRunners(config);
-
-      // Wait for the transaction runner to be scheduled.
-      UtilWaitThread.sleep(3000);
 
       callStarted = new CountDownLatch(1);
       finishCall = new CountDownLatch(1);
@@ -359,6 +416,11 @@ public class FateIT {
       assertEquals(NEW, getTxStatus(zk, txid));
       fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
       assertEquals(SUBMITTED, getTxStatus(zk, txid));
+
+      fate.startTransactionRunners(config);
+      // Wait for the transaction runner to be scheduled.
+      UtilWaitThread.sleep(3000);
+
       // wait for call() to be called
       callStarted.await();
       // cancel the transaction
@@ -369,6 +431,69 @@ public class FateIT {
 
   }
 
+  @Test
+  public void testRepoFails() throws Exception {
+    /*
+     * This test ensures that when an exception occurs in a Repo's call() or 
isReady() methods, that
+     * undo() will be called back up the chain of Repo's and in the correct 
order. The test works as
+     * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and 
returns Repo3, 3) Repo3
+     * is called and throws an exception (in call() or isReady()). It is then 
expected that: 1)
+     * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is 
called on Repo1
+     */
+    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
+    final AgeOffStore<Manager> store =
+        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
+
+    Manager manager = createMock(Manager.class);
+    ServerContext sctx = createMock(ServerContext.class);
+    expect(manager.getContext()).andReturn(sctx).anyTimes();
+    expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
+    expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
+    replay(manager, sctx);
+
+    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
+    try {
+      ConfigurationCopy config = new ConfigurationCopy();
+      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+      fate.startTransactionRunners(config);
+
+      // Wait for the transaction runner to be scheduled.
+      UtilWaitThread.sleep(3000);
+
+      List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1");
+      /*
+       * Test exception in call()
+       */
+      undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
+      long txid = fate.startTransaction();
+      assertEquals(NEW, getTxStatus(zk, txid));
+      fate.seedTransaction("TestOperationFails", txid,
+          new TestOperationFails(1, ExceptionLocation.CALL), false, "Test Op 
Fails");
+      // Wait for all the undo() calls to complete
+      undoLatch.await();
+      assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
+      assertEquals(FAILED, fate.waitForCompletion(txid));
+      assertTrue(fate.getException(txid).getMessage().contains("call() 
failed"));
+      /*
+       * Test exception in isReady()
+       */
+      TestOperationFails.undoOrder = new ArrayList<>();
+      undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
+      txid = fate.startTransaction();
+      assertEquals(NEW, getTxStatus(zk, txid));
+      fate.seedTransaction("TestOperationFails", txid,
+          new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test 
Op Fails");
+      // Wait for all the undo() calls to complete
+      undoLatch.await();
+      assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
+      assertEquals(FAILED, fate.waitForCompletion(txid));
+      assertTrue(fate.getException(txid).getMessage().contains("isReady() 
failed"));
+    } finally {
+      fate.shutdown();
+    }
+  }
+
   private static void inCall() throws InterruptedException {
     // signal that call started
     callStarted.countDown();

Reply via email to