This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit e3d62043051d22f57b7bd71760bec52352eb160c Merge: 4886e821ed 00bac7ae56 Author: Ed Coleman <edcole...@apache.org> AuthorDate: Wed Feb 21 22:17:04 2024 +0000 Merge remote-tracking branch 'upstream/2.1' - merge FateIT into main - includes minor quality check fixes .../accumulo/test/fate/zookeeper/FateIT.java | 160 ++++++++++++++++++--- 1 file changed, 139 insertions(+), 21 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java index d4da1ebae4,2dde8fabca..a392313bd0 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java @@@ -121,12 -177,17 +176,17 @@@ public class FateIT private static ZooKeeperTestingServer szk = null; private static ZooReaderWriter zk = null; -- private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID().toString(); ++ private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID(); private static final NamespaceId NS = NamespaceId.of("testNameSpace"); private static final TableId TID = TableId.of("testTable"); private static CountDownLatch callStarted; private static CountDownLatch finishCall; + private static CountDownLatch undoLatch; + + private enum ExceptionLocation { + CALL, IS_READY - }; ++ } @BeforeAll public static void setup() throws Exception { @@@ -148,9 -209,9 +208,8 @@@ @Timeout(30) public void testTransactionStatus() throws Exception { -- final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); -- final AgeOffStore<Manager> store = -- new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); ++ final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk); ++ final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, System::currentTimeMillis); Manager manager = createMock(Manager.class); ServerContext sctx = createMock(ServerContext.class); @@@ -159,15 -220,12 +218,12 @@@ expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(manager, sctx); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString); + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString, config); ++ Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, config); try { - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); - callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@@ -175,6 -233,11 +231,10 @@@ assertEquals(TStatus.NEW, getTxStatus(zk, txid)); fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid)); + - fate.startTransactionRunners(config); + // Wait for the transaction runner to be scheduled. - UtilWaitThread.sleep(3000); ++ Thread.sleep(3000); + // wait for call() to be called callStarted.await(); assertEquals(IN_PROGRESS, getTxStatus(zk, txid)); @@@ -208,9 -271,9 +268,8 @@@ @Test public void testCancelWhileNew() throws Exception { -- final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); -- final AgeOffStore<Manager> store = -- new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); ++ final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk); ++ final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, System::currentTimeMillis); Manager manager = createMock(Manager.class); ServerContext sctx = createMock(ServerContext.class); @@@ -219,14 -282,15 +278,13 @@@ expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(manager, sctx); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString); + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString, config); ++ Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, config); try { - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - fate.startTransactionRunners(config); -- // Wait for the transaction runner to be scheduled. - UtilWaitThread.sleep(3000); + Thread.sleep(3000); callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@@ -249,10 -313,10 +307,9 @@@ } @Test - public void testCancelWhileSubmittedNotRunning() throws Exception { - final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore<Manager> store = - new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); + public void testCancelWhileSubmittedAndRunning() throws Exception { - final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore<Manager> store = - new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); ++ final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk); ++ final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, System::currentTimeMillis); Manager manager = createMock(Manager.class); ServerContext sctx = createMock(ServerContext.class); @@@ -261,14 -325,48 +318,14 @@@ expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(manager, sctx); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString); ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - - // Notice that we did not start the transaction runners - - // Wait for the transaction runner to be scheduled. - UtilWaitThread.sleep(3000); - - callStarted = new CountDownLatch(1); - finishCall = new CountDownLatch(1); - - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileSubmitted with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(zk, txid)); - fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); - assertEquals(SUBMITTED, getTxStatus(zk, txid)); - assertTrue(fate.cancel(txid)); - } - - @Test - public void testCancelWhileSubmittedAndRunning() throws Exception { - final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore<Manager> store = - new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); - - Manager manager = createMock(Manager.class); - ServerContext sctx = createMock(ServerContext.class); - expect(manager.getContext()).andReturn(sctx).anyTimes(); - expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); - expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); - replay(manager, sctx); - - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString, config); ++ Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, config); try { - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - fate.startTransactionRunners(config); // Wait for the transaction runner to be scheduled. - UtilWaitThread.sleep(3000); + Thread.sleep(3000); callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@@ -293,9 -391,9 +350,8 @@@ @Test public void testCancelWhileInCall() throws Exception { -- final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); -- final AgeOffStore<Manager> store = -- new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); ++ final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk); ++ final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, System::currentTimeMillis); Manager manager = createMock(Manager.class); ServerContext sctx = createMock(ServerContext.class); @@@ -304,14 -402,11 +360,14 @@@ expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(manager, sctx); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString); + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString, config); ++ Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, config); try { - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); + + // Wait for the transaction runner to be scheduled. + Thread.sleep(3000); callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@@ -321,6 -416,11 +377,7 @@@ assertEquals(NEW, getTxStatus(zk, txid)); fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); assertEquals(SUBMITTED, getTxStatus(zk, txid)); + - fate.startTransactionRunners(config); - // Wait for the transaction runner to be scheduled. - UtilWaitThread.sleep(3000); - // wait for call() to be called callStarted.await(); // cancel the transaction @@@ -331,6 -431,69 +388,67 @@@ } + @Test + public void testRepoFails() throws Exception { + /* + * This test ensures that when an exception occurs in a Repo's call() or isReady() methods, that + * undo() will be called back up the chain of Repo's and in the correct order. The test works as + * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and returns Repo3, 3) Repo3 + * is called and throws an exception (in call() or isReady()). It is then expected that: 1) + * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is called on Repo1 + */ - final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore<Manager> store = - new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis); ++ final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk); ++ final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, System::currentTimeMillis); + + Manager manager = createMock(Manager.class); + ServerContext sctx = createMock(ServerContext.class); + expect(manager.getContext()).andReturn(sctx).anyTimes(); + expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); + expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); + replay(manager, sctx); + - Fate<Manager> fate = new Fate<Manager>(manager, store, TraceRepo::toLogString); ++ ConfigurationCopy config = new ConfigurationCopy(); ++ config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); ++ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); ++ Fate<Manager> fate = new Fate<>(manager, store, TraceRepo::toLogString, config); + try { - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - fate.startTransactionRunners(config); + + // Wait for the transaction runner to be scheduled. - UtilWaitThread.sleep(3000); ++ Thread.sleep(3000); + + List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1"); + /* + * Test exception in call() + */ + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); + long txid = fate.startTransaction(); + assertEquals(NEW, getTxStatus(zk, txid)); + fate.seedTransaction("TestOperationFails", txid, + new TestOperationFails(1, ExceptionLocation.CALL), false, "Test Op Fails"); + // Wait for all the undo() calls to complete + undoLatch.await(); + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); + assertEquals(FAILED, fate.waitForCompletion(txid)); + assertTrue(fate.getException(txid).getMessage().contains("call() failed")); + /* + * Test exception in isReady() + */ + TestOperationFails.undoOrder = new ArrayList<>(); + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); + txid = fate.startTransaction(); + assertEquals(NEW, getTxStatus(zk, txid)); + fate.seedTransaction("TestOperationFails", txid, + new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test Op Fails"); + // Wait for all the undo() calls to complete + undoLatch.await(); + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); + assertEquals(FAILED, fate.waitForCompletion(txid)); + assertTrue(fate.getException(txid).getMessage().contains("isReady() failed")); + } finally { + fate.shutdown(); + } + } + private static void inCall() throws InterruptedException { // signal that call started callStarted.countDown();