This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 552c7a15b6 Modified Fate TransactionRunner to exit normally when shut
down (#6021)
552c7a15b6 is described below
commit 552c7a15b66ed76e7e07e475d9e8e1b6b6be492f
Author: Dave Marion <[email protected]>
AuthorDate: Thu Dec 18 15:57:17 2025 -0500
Modified Fate TransactionRunner to exit normally when shut down (#6021)
Modified the Fate TransactionRunner threads to allow them to
exit their run loop normally when an InterruptedException is
raised due to Fate being shut down.
Closes #6014
---
.../java/org/apache/accumulo/core/fate/Fate.java | 62 +++++++++++++++++--
.../accumulo/test/fate/zookeeper/FateIT.java | 70 +++++++++++++++++++++-
2 files changed, 125 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 906abc24b8..684fd69cc0 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -74,6 +74,24 @@ public class Fate<T> {
private class TransactionRunner implements Runnable {
+ private boolean isInterruptedException(Throwable e) {
+ if (e == null) {
+ return false;
+ }
+
+ if (e instanceof InterruptedException) {
+ return true;
+ }
+
+ for (Throwable suppressed : e.getSuppressed()) {
+ if (isInterruptedException(suppressed)) {
+ return true;
+ }
+ }
+
+ return isInterruptedException(e.getCause());
+ }
+
@Override
public void run() {
while (keepRunning.get()) {
@@ -99,14 +117,31 @@ public class Fate<T> {
store.setStatus(tid, IN_PROGRESS);
}
op = executeCall(tid, op);
+ // It's possible that a Fate operation impl
+ // may not do the right thing with an
+ // InterruptedException.
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Fate Transaction Runner
thread interrupted");
+ }
} else {
continue;
}
-
} catch (Exception e) {
- blockIfHadoopShutdown(tid, e);
- transitionToFailed(tid, e);
- continue;
+ if (!isInterruptedException(e)) {
+ blockIfHadoopShutdown(tid, e);
+ transitionToFailed(tid, e);
+ continue;
+ } else {
+ if (keepRunning.get()) {
+ throw e;
+ } else {
+ // If we are shutting down then Fate.shutdown was called
+ // and ExecutorService.shutdownNow was called resulting
+ // in this exception. We will exit at the top of the loop.
+ Thread.interrupted();
+ continue;
+ }
+ }
}
if (op == null) {
@@ -130,7 +165,19 @@ public class Fate<T> {
}
}
} catch (Exception e) {
- runnerLog.error("Uncaught exception in FATE runner thread.", e);
+ if (isInterruptedException(e)) {
+ if (keepRunning.get()) {
+ runnerLog.error("Uncaught InterruptedException in FATE runner
thread.", e);
+ } else {
+ // If we are shutting down then Fate.shutdown was called
+ // and ExecutorService.shutdownNow was called resulting
+ // in this exception. We will exit at the top of the loop,
+ // so continue this loop iteration normally.
+ Thread.interrupted();
+ }
+ } else {
+ runnerLog.error("Uncaught exception in FATE runner thread.", e);
+ }
} finally {
if (tid != null) {
store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS);
@@ -198,6 +245,7 @@ public class Fate<T> {
}
private void doCleanUp(long tid) {
+ log.debug("Cleaning up {}", tid);
Boolean autoClean = (Boolean) store.getTransactionInfo(tid,
TxInfo.AUTO_CLEAN);
if (autoClean != null && autoClean) {
store.delete(tid);
@@ -261,6 +309,7 @@ public class Fate<T> {
ScheduledThreadPoolExecutor serverGeneralScheduledThreadPool) {
final ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
+ log.debug("Starting Fate Transaction Runner pool with {} threads",
pool.getCorePoolSize());
ThreadPools
.watchCriticalScheduledTask(serverGeneralScheduledThreadPool.scheduleWithFixedDelay(()
-> {
// resize the pool if the property changed
@@ -421,6 +470,9 @@ public class Fate<T> {
* Flags that FATE threadpool to clear out and end. Does not actively stop
running FATE processes.
*/
public void shutdown(boolean wait) {
+ log.info("Shutdown called on Fate, waiting: {}", wait);
+ // important this is set before shutdownNow is called as the background
+ // threads will check this to see if shutdown related errors should be
ignored.
keepRunning.set(false);
if (executor == null) {
return;
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index be5e9d7809..e6029d5649 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@ -31,6 +31,7 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -42,6 +43,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@ -190,6 +192,7 @@ public class FateIT {
private static CountDownLatch callStarted;
private static CountDownLatch finishCall;
private static CountDownLatch undoLatch;
+ private static AtomicReference<Throwable> interruptedException = new
AtomicReference<>();
Fate<Manager> fate;
@@ -211,6 +214,7 @@ public class FateIT {
@BeforeEach
public void beforeEach() throws Exception {
+ interruptedException.set(null);
final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT +
Constants.ZFATE, zk, zc);
final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000,
System::currentTimeMillis);
@@ -448,11 +452,73 @@ public class FateIT {
assertTrue(fate.getException(txid).getMessage().contains("isReady()
failed"));
}
+ @Test
+ public void testShutdownDoesNotFailTx() throws Exception {
+ ConfigurationCopy config = new ConfigurationCopy();
+ config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+
+ // Wait for the transaction runner to be scheduled.
+ UtilWaitThread.sleep(3000);
+
+ callStarted = new CountDownLatch(1);
+ finishCall = new CountDownLatch(1);
+
+ long txid = fate.startTransaction();
+ assertEquals(TStatus.NEW, getTxStatus(zk, txid));
+ fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID),
true, "Test Op");
+ assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
+
+ fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
+
+ // wait for call() to be called
+ callStarted.await();
+ assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
+
+ // shutdown fate
+ fate.shutdown(true);
+
+ // tell the op to exit the method
+ Wait.waitFor(() -> interruptedException.get() != null);
+ interruptedException.set(null);
+
+ // restart fate
+ beforeEach();
+ assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
+
+ // Restarting the transaction runners will retry the in-progress
+ // transaction. Reset the CountDownLatch's to confirm.
+ callStarted = new CountDownLatch(1);
+ finishCall = new CountDownLatch(1);
+ fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
+ callStarted.await();
+ assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
+ finishCall.countDown();
+
+ // This should complete normally, cleaning up the tx and deleting it from
ZK
+ while (true) {
+ try {
+ getTxStatus(zk, txid);
+ Thread.sleep(100);
+ continue;
+ } catch (KeeperException.NoNodeException e) {
+ break;
+ }
+ }
+ assertNull(interruptedException.get());
+ }
+
private static void inCall() throws InterruptedException {
// signal that call started
callStarted.countDown();
- // wait for the signal to exit the method
- finishCall.await();
+ try {
+ // wait for the signal to exit the method
+ finishCall.await();
+ } catch (InterruptedException e) {
+ LOG.debug("InterruptedException occurred inCall.");
+ interruptedException.set(e);
+ throw e;
+ }
}
/*