This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 552c7a15b6 Modified Fate TransactionRunner to exit normally when shut 
down (#6021)
552c7a15b6 is described below

commit 552c7a15b66ed76e7e07e475d9e8e1b6b6be492f
Author: Dave Marion <[email protected]>
AuthorDate: Thu Dec 18 15:57:17 2025 -0500

    Modified Fate TransactionRunner to exit normally when shut down (#6021)
    
    Modified the Fate TransactionRunner threads to allow them to
    exit their run loop normally when an InterruptedException is
    raised due to Fate being shut down.
    
    Closes #6014
---
 .../java/org/apache/accumulo/core/fate/Fate.java   | 62 +++++++++++++++++--
 .../accumulo/test/fate/zookeeper/FateIT.java       | 70 +++++++++++++++++++++-
 2 files changed, 125 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 906abc24b8..684fd69cc0 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -74,6 +74,24 @@ public class Fate<T> {
 
   private class TransactionRunner implements Runnable {
 
+    private boolean isInterruptedException(Throwable e) {
+      if (e == null) {
+        return false;
+      }
+
+      if (e instanceof InterruptedException) {
+        return true;
+      }
+
+      for (Throwable suppressed : e.getSuppressed()) {
+        if (isInterruptedException(suppressed)) {
+          return true;
+        }
+      }
+
+      return isInterruptedException(e.getCause());
+    }
+
     @Override
     public void run() {
       while (keepRunning.get()) {
@@ -99,14 +117,31 @@ public class Fate<T> {
                   store.setStatus(tid, IN_PROGRESS);
                 }
                 op = executeCall(tid, op);
+                // It's possible that a Fate operation impl
+                // may not do the right thing with an
+                // InterruptedException.
+                if (Thread.currentThread().isInterrupted()) {
+                  throw new InterruptedException("Fate Transaction Runner 
thread interrupted");
+                }
               } else {
                 continue;
               }
-
             } catch (Exception e) {
-              blockIfHadoopShutdown(tid, e);
-              transitionToFailed(tid, e);
-              continue;
+              if (!isInterruptedException(e)) {
+                blockIfHadoopShutdown(tid, e);
+                transitionToFailed(tid, e);
+                continue;
+              } else {
+                if (keepRunning.get()) {
+                  throw e;
+                } else {
+                  // If we are shutting down then Fate.shutdown was called
+                  // and ExecutorService.shutdownNow was called resulting
+                  // in this exception. We will exit at the top of the loop.
+                  Thread.interrupted();
+                  continue;
+                }
+              }
             }
 
             if (op == null) {
@@ -130,7 +165,19 @@ public class Fate<T> {
             }
           }
         } catch (Exception e) {
-          runnerLog.error("Uncaught exception in FATE runner thread.", e);
+          if (isInterruptedException(e)) {
+            if (keepRunning.get()) {
+              runnerLog.error("Uncaught InterruptedException in FATE runner 
thread.", e);
+            } else {
+              // If we are shutting down then Fate.shutdown was called
+              // and ExecutorService.shutdownNow was called resulting
+              // in this exception. We will exit at the top of the loop,
+              // so continue this loop iteration normally.
+              Thread.interrupted();
+            }
+          } else {
+            runnerLog.error("Uncaught exception in FATE runner thread.", e);
+          }
         } finally {
           if (tid != null) {
             store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS);
@@ -198,6 +245,7 @@ public class Fate<T> {
     }
 
     private void doCleanUp(long tid) {
+      log.debug("Cleaning up {}", tid);
       Boolean autoClean = (Boolean) store.getTransactionInfo(tid, 
TxInfo.AUTO_CLEAN);
       if (autoClean != null && autoClean) {
         store.delete(tid);
@@ -261,6 +309,7 @@ public class Fate<T> {
       ScheduledThreadPoolExecutor serverGeneralScheduledThreadPool) {
     final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
         Property.MANAGER_FATE_THREADPOOL_SIZE, true);
+    log.debug("Starting Fate Transaction Runner pool with {} threads", 
pool.getCorePoolSize());
     ThreadPools
         
.watchCriticalScheduledTask(serverGeneralScheduledThreadPool.scheduleWithFixedDelay(()
 -> {
           // resize the pool if the property changed
@@ -421,6 +470,9 @@ public class Fate<T> {
    * Flags that FATE threadpool to clear out and end. Does not actively stop 
running FATE processes.
    */
   public void shutdown(boolean wait) {
+    log.info("Shutdown called on Fate, waiting: {}", wait);
+    // important this is set before shutdownNow is called as the background
+    // threads will check this to see if shutdown related errors should be 
ignored.
     keepRunning.set(false);
     if (executor == null) {
       return;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index be5e9d7809..e6029d5649 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@ -31,6 +31,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -42,6 +43,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
@@ -190,6 +192,7 @@ public class FateIT {
   private static CountDownLatch callStarted;
   private static CountDownLatch finishCall;
   private static CountDownLatch undoLatch;
+  private static AtomicReference<Throwable> interruptedException = new 
AtomicReference<>();
 
   Fate<Manager> fate;
 
@@ -211,6 +214,7 @@ public class FateIT {
 
   @BeforeEach
   public void beforeEach() throws Exception {
+    interruptedException.set(null);
     final ZooStore<Manager> zooStore = new ZooStore<>(ZK_ROOT + 
Constants.ZFATE, zk, zc);
     final AgeOffStore<Manager> store = new AgeOffStore<>(zooStore, 3000, 
System::currentTimeMillis);
 
@@ -448,11 +452,73 @@ public class FateIT {
     assertTrue(fate.getException(txid).getMessage().contains("isReady() 
failed"));
   }
 
+  @Test
+  public void testShutdownDoesNotFailTx() throws Exception {
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+
+    // Wait for the transaction runner to be scheduled.
+    UtilWaitThread.sleep(3000);
+
+    callStarted = new CountDownLatch(1);
+    finishCall = new CountDownLatch(1);
+
+    long txid = fate.startTransaction();
+    assertEquals(TStatus.NEW, getTxStatus(zk, txid));
+    fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
+    assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
+
+    fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
+
+    // wait for call() to be called
+    callStarted.await();
+    assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
+
+    // shutdown fate
+    fate.shutdown(true);
+
+    // tell the op to exit the method
+    Wait.waitFor(() -> interruptedException.get() != null);
+    interruptedException.set(null);
+
+    // restart fate
+    beforeEach();
+    assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
+
+    // Restarting the transaction runners will retry the in-progress
+    // transaction. Reset the CountDownLatch's to confirm.
+    callStarted = new CountDownLatch(1);
+    finishCall = new CountDownLatch(1);
+    fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
+    callStarted.await();
+    assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
+    finishCall.countDown();
+
+    // This should complete normally, cleaning up the tx and deleting it from 
ZK
+    while (true) {
+      try {
+        getTxStatus(zk, txid);
+        Thread.sleep(100);
+        continue;
+      } catch (KeeperException.NoNodeException e) {
+        break;
+      }
+    }
+    assertNull(interruptedException.get());
+  }
+
   private static void inCall() throws InterruptedException {
     // signal that call started
     callStarted.countDown();
-    // wait for the signal to exit the method
-    finishCall.await();
+    try {
+      // wait for the signal to exit the method
+      finishCall.await();
+    } catch (InterruptedException e) {
+      LOG.debug("InterruptedException occurred inCall.");
+      interruptedException.set(e);
+      throw e;
+    }
   }
 
   /*

Reply via email to