This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit e3d62043051d22f57b7bd71760bec52352eb160c
Merge: 4886e821ed 00bac7ae56
Author: Ed Coleman <edcole...@apache.org>
AuthorDate: Wed Feb 21 22:17:04 2024 +0000

    Merge remote-tracking branch 'upstream/2.1'
    
     - merge FateIT into main
     - includes minor quality check fixes

 .../accumulo/test/fate/zookeeper/FateIT.java       | 160 ++++++++++++++++++---
 1 file changed, 139 insertions(+), 21 deletions(-)

diff --cc test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index d4da1ebae4,2dde8fabca..a392313bd0
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@@ -121,12 -177,17 +176,17 @@@ public class FateIT 
  
    private static ZooKeeperTestingServer szk = null;
    private static ZooReaderWriter zk = null;
--  private static final String ZK_ROOT = "/accumulo/" + 
UUID.randomUUID().toString();
++  private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID();
    private static final NamespaceId NS = NamespaceId.of("testNameSpace");
    private static final TableId TID = TableId.of("testTable");
  
    private static CountDownLatch callStarted;
    private static CountDownLatch finishCall;
+   private static CountDownLatch undoLatch;
+ 
+   private enum ExceptionLocation {
+     CALL, IS_READY
 -  };
++  }
  
    @BeforeAll
    public static void setup() throws Exception {
@@@ -148,9 -209,9 +208,8 @@@
    @Timeout(30)
    public void testTransactionStatus() throws Exception {
  
--    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
--    final AgeOffStore<Manager> store =
--        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
++    final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + 
Constants.ZFATE, zk);
++    final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, 
System::currentTimeMillis);
  
      Manager manager = createMock(Manager.class);
      ServerContext sctx = createMock(ServerContext.class);
@@@ -159,15 -220,12 +218,12 @@@
      expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
      replay(manager, sctx);
  
 -    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
 +    ConfigurationCopy config = new ConfigurationCopy();
 +    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 +    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-     Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
++    Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, 
config);
      try {
 -      ConfigurationCopy config = new ConfigurationCopy();
 -      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 -      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
  
-       // Wait for the transaction runner to be scheduled.
-       Thread.sleep(3000);
- 
        callStarted = new CountDownLatch(1);
        finishCall = new CountDownLatch(1);
  
@@@ -175,6 -233,11 +231,10 @@@
        assertEquals(TStatus.NEW, getTxStatus(zk, txid));
        fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
        assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
+ 
 -      fate.startTransactionRunners(config);
+       // Wait for the transaction runner to be scheduled.
 -      UtilWaitThread.sleep(3000);
++      Thread.sleep(3000);
+ 
        // wait for call() to be called
        callStarted.await();
        assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
@@@ -208,9 -271,9 +268,8 @@@
  
    @Test
    public void testCancelWhileNew() throws Exception {
--    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
--    final AgeOffStore<Manager> store =
--        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
++    final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + 
Constants.ZFATE, zk);
++    final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, 
System::currentTimeMillis);
  
      Manager manager = createMock(Manager.class);
      ServerContext sctx = createMock(ServerContext.class);
@@@ -219,14 -282,15 +278,13 @@@
      expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
      replay(manager, sctx);
  
 -    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
 +    ConfigurationCopy config = new ConfigurationCopy();
 +    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 +    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-     Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
++    Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, 
config);
      try {
 -      ConfigurationCopy config = new ConfigurationCopy();
 -      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 -      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
 -      fate.startTransactionRunners(config);
--
        // Wait for the transaction runner to be scheduled.
 -      UtilWaitThread.sleep(3000);
 +      Thread.sleep(3000);
  
        callStarted = new CountDownLatch(1);
        finishCall = new CountDownLatch(1);
@@@ -249,10 -313,10 +307,9 @@@
    }
  
    @Test
 -  public void testCancelWhileSubmittedNotRunning() throws Exception {
 -    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
 -    final AgeOffStore<Manager> store =
 -        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
 +  public void testCancelWhileSubmittedAndRunning() throws Exception {
-     final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
-     final AgeOffStore<Manager> store =
-         new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
++    final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + 
Constants.ZFATE, zk);
++    final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, 
System::currentTimeMillis);
  
      Manager manager = createMock(Manager.class);
      ServerContext sctx = createMock(ServerContext.class);
@@@ -261,14 -325,48 +318,14 @@@
      expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
      replay(manager, sctx);
  
 -    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
      ConfigurationCopy config = new ConfigurationCopy();
      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 -
 -    // Notice that we did not start the transaction runners
 -
 -    // Wait for the transaction runner to be scheduled.
 -    UtilWaitThread.sleep(3000);
 -
 -    callStarted = new CountDownLatch(1);
 -    finishCall = new CountDownLatch(1);
 -
 -    long txid = fate.startTransaction();
 -    LOG.debug("Starting test testCancelWhileSubmitted with {}", 
FateTxId.formatTid(txid));
 -    assertEquals(NEW, getTxStatus(zk, txid));
 -    fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
 -    assertEquals(SUBMITTED, getTxStatus(zk, txid));
 -    assertTrue(fate.cancel(txid));
 -  }
 -
 -  @Test
 -  public void testCancelWhileSubmittedAndRunning() throws Exception {
 -    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
 -    final AgeOffStore<Manager> store =
 -        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
 -
 -    Manager manager = createMock(Manager.class);
 -    ServerContext sctx = createMock(ServerContext.class);
 -    expect(manager.getContext()).andReturn(sctx).anyTimes();
 -    expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
 -    expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
 -    replay(manager, sctx);
 -
 -    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
 +    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-     Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
++    Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, 
config);
      try {
 -      ConfigurationCopy config = new ConfigurationCopy();
 -      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 -      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
 -      fate.startTransactionRunners(config);
  
        // Wait for the transaction runner to be scheduled.
 -      UtilWaitThread.sleep(3000);
 +      Thread.sleep(3000);
  
        callStarted = new CountDownLatch(1);
        finishCall = new CountDownLatch(1);
@@@ -293,9 -391,9 +350,8 @@@
  
    @Test
    public void testCancelWhileInCall() throws Exception {
--    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
--    final AgeOffStore<Manager> store =
--        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
++    final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + 
Constants.ZFATE, zk);
++    final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, 
System::currentTimeMillis);
  
      Manager manager = createMock(Manager.class);
      ServerContext sctx = createMock(ServerContext.class);
@@@ -304,14 -402,11 +360,14 @@@
      expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
      replay(manager, sctx);
  
 -    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
 +    ConfigurationCopy config = new ConfigurationCopy();
 +    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 +    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-     Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
++    Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, 
config);
      try {
 -      ConfigurationCopy config = new ConfigurationCopy();
 -      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 -      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
 +
 +      // Wait for the transaction runner to be scheduled.
 +      Thread.sleep(3000);
  
        callStarted = new CountDownLatch(1);
        finishCall = new CountDownLatch(1);
@@@ -321,6 -416,11 +377,7 @@@
        assertEquals(NEW, getTxStatus(zk, txid));
        fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
        assertEquals(SUBMITTED, getTxStatus(zk, txid));
+ 
 -      fate.startTransactionRunners(config);
 -      // Wait for the transaction runner to be scheduled.
 -      UtilWaitThread.sleep(3000);
 -
        // wait for call() to be called
        callStarted.await();
        // cancel the transaction
@@@ -331,6 -431,69 +388,67 @@@
  
    }
  
+   @Test
+   public void testRepoFails() throws Exception {
+     /*
+      * This test ensures that when an exception occurs in a Repo's call() or 
isReady() methods, that
+      * undo() will be called back up the chain of Repo's and in the correct 
order. The test works as
+      * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and 
returns Repo3, 3) Repo3
+      * is called and throws an exception (in call() or isReady()). It is then 
expected that: 1)
+      * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is 
called on Repo1
+      */
 -    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
 -    final AgeOffStore<Manager> store =
 -        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
++    final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + 
Constants.ZFATE, zk);
++    final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, 
System::currentTimeMillis);
+ 
+     Manager manager = createMock(Manager.class);
+     ServerContext sctx = createMock(ServerContext.class);
+     expect(manager.getContext()).andReturn(sctx).anyTimes();
+     expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
+     expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
+     replay(manager, sctx);
+ 
 -    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
++    ConfigurationCopy config = new ConfigurationCopy();
++    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
++    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
++    Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, 
config);
+     try {
 -      ConfigurationCopy config = new ConfigurationCopy();
 -      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
 -      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
 -      fate.startTransactionRunners(config);
+ 
+       // Wait for the transaction runner to be scheduled.
 -      UtilWaitThread.sleep(3000);
++      Thread.sleep(3000);
+ 
+       List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1");
+       /*
+        * Test exception in call()
+        */
+       undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
+       long txid = fate.startTransaction();
+       assertEquals(NEW, getTxStatus(zk, txid));
+       fate.seedTransaction("TestOperationFails", txid,
+           new TestOperationFails(1, ExceptionLocation.CALL), false, "Test Op 
Fails");
+       // Wait for all the undo() calls to complete
+       undoLatch.await();
+       assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
+       assertEquals(FAILED, fate.waitForCompletion(txid));
+       assertTrue(fate.getException(txid).getMessage().contains("call() 
failed"));
+       /*
+        * Test exception in isReady()
+        */
+       TestOperationFails.undoOrder = new ArrayList<>();
+       undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
+       txid = fate.startTransaction();
+       assertEquals(NEW, getTxStatus(zk, txid));
+       fate.seedTransaction("TestOperationFails", txid,
+           new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test 
Op Fails");
+       // Wait for all the undo() calls to complete
+       undoLatch.await();
+       assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
+       assertEquals(FAILED, fate.waitForCompletion(txid));
+       assertTrue(fate.getException(txid).getMessage().contains("isReady() 
failed"));
+     } finally {
+       fate.shutdown();
+     }
+   }
+ 
    private static void inCall() throws InterruptedException {
      // signal that call started
      callStarted.countDown();

Reply via email to