This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new bd7d82e30b Add a default limit of deferred FATE transactions (#4179)
bd7d82e30b is described below

commit bd7d82e30b1692ece0b8db096eeb68a603c80ef8
Author: Christopher L. Shannon <christopher.l.shan...@gmail.com>
AuthorDate: Sun Jan 21 18:57:41 2024 -0500

    Add a default limit of deferred FATE transactions (#4179)
    
    This commit adds support for limiting the number of FATE transactions
    that can be deferred at the same time. If the number reaches the maximum
    then the overflow flag is set and the deferred map will be cleared so
    that the next run by FATE through the transactions will run all of the
    outstanding operations and not defer. New transactions won't be
    added to the deferred map until the executor runs through the
    transaction list and clears the flag.
    
    This condition should not occur frequently but if it does it will tell
    FATE to try and catch up and work off the backlog of deferred
    transactions.
---
 .../accumulo/core/fate/AbstractFateStore.java      |  55 ++++++-
 .../accumulo/core/fate/ReadOnlyFateStore.java      |  13 ++
 .../org/apache/accumulo/core/fate/ZooStore.java    |   7 +-
 .../accumulo/core/fate/accumulo/AccumuloStore.java |   5 +
 .../apache/accumulo/core/logging/FateLogger.java   |  10 ++
 .../org/apache/accumulo/core/fate/TestStore.java   |   9 ++
 .../java/org/apache/accumulo/test/fate/FateIT.java | 168 ++++++++++++++++++---
 .../test/fate/accumulo/AccumuloFateIT.java         |   7 +-
 .../fate/accumulo/AccumuloStoreReadWriteIT.java    |  90 +++++++++++
 .../test/fate/zookeeper/ZookeeperFateIT.java       |  16 +-
 10 files changed, 351 insertions(+), 29 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 89eca7cbf6..f67079b0ef 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -51,8 +51,14 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
 
   private static final Logger log = 
LoggerFactory.getLogger(AbstractFateStore.class);
 
+  // Default maximum size of 100,000 transactions before deferral is stopped 
and
+  // all existing transactions are processed immediately again
+  protected static final int DEFAULT_MAX_DEFERRED = 100_000;
+
   protected final Set<Long> reserved;
   protected final Map<Long,Long> deferred;
+  private final int maxDeferred;
+  private final AtomicBoolean deferredOverflow = new AtomicBoolean();
 
   // This is incremented each time a transaction was unreserved that was non 
new
   protected final SignalCount unreservedNonNewCount = new SignalCount();
@@ -61,6 +67,11 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
   protected final SignalCount unreservedRunnableCount = new SignalCount();
 
   public AbstractFateStore() {
+    this(DEFAULT_MAX_DEFERRED);
+  }
+
+  public AbstractFateStore(int maxDeferred) {
+    this.maxDeferred = maxDeferred;
     this.reserved = new HashSet<>();
     this.deferred = new HashMap<>();
   }
@@ -130,6 +141,7 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
 
     while (keepWaiting.get() && seen.get() == 0) {
       final long beforeCount = unreservedRunnableCount.getCount();
+      final boolean beforeDeferredOverflow = deferredOverflow.get();
 
       try (Stream<FateIdStatus> transactions = getTransactions()) {
         transactions.filter(fateIdStatus -> 
isRunnable(fateIdStatus.getStatus()))
@@ -151,6 +163,9 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
             });
       }
 
+      // If deferredOverflow was previously marked true then the deferred map
+      // would have been cleared and seen.get() should be greater than 0 as 
there would
+      // be a lot of transactions to process in the previous run, so we won't 
be sleeping here
       if (seen.get() == 0) {
         if (beforeCount == unreservedRunnableCount.getCount()) {
           long waitTime = 5000;
@@ -169,6 +184,15 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
           }
         }
       }
+
+      // Reset if the current state only if it matches the state before the 
execution.
+      // This is to avoid a race condition where the flag was set during the 
run.
+      // We should ensure at least one of the FATE executors will run through 
the
+      // entire transaction list first before clearing the flag and allowing 
more
+      // deferred entries into the map again. In other words, if the before 
state
+      // was false and during the execution at some point it was marked true 
this would
+      // not reset until after the next run
+      deferredOverflow.compareAndSet(beforeDeferredOverflow, false);
     }
   }
 
@@ -204,6 +228,21 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
     }
   }
 
+  @Override
+  public boolean isDeferredOverflow() {
+    return deferredOverflow.get();
+  }
+
+  @Override
+  public int getDeferredCount() {
+    // This method is primarily used right now for unit testing but
+    // if this synchronization becomes an issue we could add an atomic
+    // counter instead to track it separately so we don't need to lock
+    synchronized (AbstractFateStore.this) {
+      return deferred.size();
+    }
+  }
+
   protected abstract Stream<FateIdStatus> getTransactions();
 
   protected abstract TStatus _getStatus(long tid);
@@ -255,8 +294,20 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
         // notify any threads waiting to reserve
         AbstractFateStore.this.notifyAll();
 
-        if (deferTime > 0) {
-          deferred.put(tid, System.nanoTime() + deferTime);
+        // If deferred map has overflowed then skip adding to the deferred map
+        // and clear the map and set the flag. This will cause the next 
execution
+        // of runnable to process all the transactions and to not defer as we
+        // have a large backlog and want to make progress
+        if (deferTime > 0 && !deferredOverflow.get()) {
+          if (deferred.size() >= maxDeferred) {
+            log.info(
+                "Deferred map overflowed with size {}, clearing and setting 
deferredOverflow to true",
+                deferred.size());
+            deferredOverflow.set(true);
+            deferred.clear();
+          } else {
+            deferred.put(tid, System.nanoTime() + deferTime);
+          }
         }
       }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
index c5f7a9027c..4ddf9afae5 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
@@ -138,4 +138,17 @@ public interface ReadOnlyFateStore<T> {
    * found were passed to the consumer.
    */
   void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer);
+
+  /**
+   * Returns true if the deferred map was cleared and if deferred executions 
are currently disabled
+   * because of too many deferred transactions
+   *
+   * @return true if the map is in a deferred overflow state, else false
+   */
+  boolean isDeferredOverflow();
+
+  /**
+   * @return the current number of transactions that have been deferred
+   */
+  int getDeferredCount();
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
index 31c299cf68..fb8f7ee7ed 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
@@ -57,7 +57,12 @@ public class ZooStore<T> extends AbstractFateStore<T> {
   }
 
   public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, 
InterruptedException {
-    super();
+    this(path, zk, DEFAULT_MAX_DEFERRED);
+  }
+
+  public ZooStore(String path, ZooReaderWriter zk, int maxDeferred)
+      throws KeeperException, InterruptedException {
+    super(maxDeferred);
     this.path = path;
     this.zk = zk;
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index 4e81065f0c..a31e081e7e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -58,6 +58,11 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
       com.google.common.collect.Range.closed(1, maxRepos);
 
   public AccumuloStore(ClientContext context, String tableName) {
+    this(context, tableName, DEFAULT_MAX_DEFERRED);
+  }
+
+  public AccumuloStore(ClientContext context, String tableName, int 
maxDeferred) {
+    super(maxDeferred);
     this.context = Objects.requireNonNull(context);
     this.tableName = Objects.requireNonNull(tableName);
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index ffd854bad4..189df12362 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -124,6 +124,16 @@ public class FateLogger {
         store.runnable(keepWaiting, idConsumer);
       }
 
+      @Override
+      public int getDeferredCount() {
+        return store.getDeferredCount();
+      }
+
+      @Override
+      public boolean isDeferredOverflow() {
+        return store.isDeferredOverflow();
+      }
+
       @Override
       public long create() {
         long tid = store.create();
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java 
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index c07f51662d..5df2e0fa0a 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -196,4 +196,13 @@ public class TestStore implements FateStore<String> {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public int getDeferredCount() {
+    return 0;
+  }
+
+  @Override
+  public boolean isDeferredOverflow() {
+    return false;
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index e424be4e87..d1797a42f9 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -28,7 +28,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
@@ -97,6 +101,54 @@ public abstract class FateIT extends SharedMiniClusterBase {
     }
   }
 
+  /**
+   * Test Repo that allows configuring a delay time to be returned in 
isReady().
+   */
+  public static class DeferredTestRepo implements Repo<TestEnv> {
+    private static final long serialVersionUID = 1L;
+
+    private final String data;
+
+    // These are static as we don't want to serialize them and they should
+    // be shared across all instances during the test
+    private static final AtomicInteger executedCalls = new AtomicInteger();
+    private static final AtomicLong delay = new AtomicLong();
+    private static final CountDownLatch callLatch = new CountDownLatch(1);
+
+    public DeferredTestRepo(String data) {
+      this.data = data;
+    }
+
+    @Override
+    public long isReady(long tid, TestEnv environment) {
+      LOG.debug("Fate {} delayed {}", tid, delay.get());
+      return delay.get();
+    }
+
+    @Override
+    public String getName() {
+      return "TestRepo_" + data;
+    }
+
+    @Override
+    public Repo<TestEnv> call(long tid, TestEnv environment) throws Exception {
+      callLatch.await();
+      LOG.debug("Executing call {}, total executed {}", 
FateTxId.formatTid(tid),
+          executedCalls.incrementAndGet());
+      return null;
+    }
+
+    @Override
+    public void undo(long tid, TestEnv environment) {
+
+    }
+
+    @Override
+    public String getReturn() {
+      return data + "_ret";
+    }
+  }
+
   @Test
   @Timeout(30)
   public void testTransactionStatus() throws Exception {
@@ -105,11 +157,7 @@ public abstract class FateIT extends SharedMiniClusterBase 
{
 
   protected void testTransactionStatus(FateStore<TestEnv> store, ServerContext 
sctx)
       throws Exception {
-    ConfigurationCopy config = new ConfigurationCopy();
-    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-    TestEnv testEnv = new TestEnv();
-    Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config);
+    Fate<TestEnv> fate = initializeFate(store);
     try {
 
       // Wait for the transaction runner to be scheduled.
@@ -141,11 +189,7 @@ public abstract class FateIT extends SharedMiniClusterBase 
{
   }
 
   protected void testCancelWhileNew(FateStore<TestEnv> store, ServerContext 
sctx) throws Exception {
-    ConfigurationCopy config = new ConfigurationCopy();
-    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-    TestEnv testEnv = new TestEnv();
-    Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config);
+    Fate<TestEnv> fate = initializeFate(store);
     try {
 
       // Wait for the transaction runner to be scheduled.
@@ -180,11 +224,7 @@ public abstract class FateIT extends SharedMiniClusterBase 
{
 
   protected void testCancelWhileSubmittedAndRunning(FateStore<TestEnv> store, 
ServerContext sctx)
       throws Exception {
-    ConfigurationCopy config = new ConfigurationCopy();
-    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-    TestEnv testEnv = new TestEnv();
-    Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config);
+    Fate<TestEnv> fate = initializeFate(store);
     try {
 
       // Wait for the transaction runner to be scheduled.
@@ -219,11 +259,7 @@ public abstract class FateIT extends SharedMiniClusterBase 
{
 
   protected void testCancelWhileInCall(FateStore<TestEnv> store, ServerContext 
sctx)
       throws Exception {
-    ConfigurationCopy config = new ConfigurationCopy();
-    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-    TestEnv testEnv = new TestEnv();
-    Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config);
+    Fate<TestEnv> fate = initializeFate(store);
     try {
 
       // Wait for the transaction runner to be scheduled.
@@ -248,10 +284,100 @@ public abstract class FateIT extends 
SharedMiniClusterBase {
 
   }
 
-  protected abstract TStatus getTxStatus(ServerContext sctx, long txid) throws 
Exception;
+  @Test
+  @Timeout(30)
+  public void testDeferredOverflow() throws Exception {
+    // Set a maximum deferred map size of 10 transactions so that when the 11th
+    // is seen the Fate store should clear the deferred map and mark
+    // the flag as overflow so that all the deferred transactions will be run
+    executeTest(this::testDeferredOverflow, 10);
+  }
+
+  protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext 
sctx)
+      throws Exception {
+    Fate<TestEnv> fate = initializeFate(store);
+    try {
+
+      // Wait for the transaction runner to be scheduled.
+      Thread.sleep(3000);
+
+      DeferredTestRepo.executedCalls.set(0);
+      // Initialize the repo to have a delay of 30 seconds
+      // so it will be deferred when submitted
+      DeferredTestRepo.delay.set(30000);
+
+      Set<Long> transactions = new HashSet<>();
+
+      // Start by creating 10 transactions that are all deferred which should
+      // fill up the deferred map with all 10 as we set the max deferred limit
+      // to only allow 10 transactions
+      for (int i = 0; i < 10; i++) {
+        submitDeferred(fate, sctx, transactions);
+      }
+
+      // Verify all 10 are deferred in the map and each will
+      // We should not be in an overflow state yet
+      Wait.waitFor(() -> store.getDeferredCount() == 10);
+      assertFalse(store.isDeferredOverflow());
+
+      // After verifying all 10 are deferred, submit another 10
+      // which should trigger an overflow. We are blocking in the
+      // call method of DeferredTestRepo at this point using a countdown
+      // latch to prevent fate executor from running early and clearing
+      // the deferred overflow flag before we can check it below
+      for (int i = 0; i < 10; i++) {
+        submitDeferred(fate, sctx, transactions);
+      }
+      // Verify deferred overflow is true and map is now empty
+      Wait.waitFor(() -> store.getDeferredCount() == 0);
+      Wait.waitFor(store::isDeferredOverflow);
+
+      // Set the delay to 0 and countdown so we will process the
+      // call method in the repos. We need to change the delay because
+      // due to the async nature of Fate it's possible some of the submitted
+      // repos previously wouldn't be processed in the first batch until
+      // after the flag was cleared which would trigger a long delay again
+      DeferredTestRepo.delay.set(0);
+      DeferredTestRepo.callLatch.countDown();
+
+      // Verify the flag was cleared and everything ran
+      Wait.waitFor(() -> !store.isDeferredOverflow());
+      Wait.waitFor(() -> DeferredTestRepo.executedCalls.get() == 20);
+
+      // Verify all 20 unique transactions finished
+      Wait.waitFor(() -> {
+        transactions.removeIf(txid -> getTxStatus(sctx, txid) == UNKNOWN);
+        return transactions.isEmpty();
+      });
+
+    } finally {
+      fate.shutdown();
+    }
+  }
+
+  private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx, 
Set<Long> transactions) {
+    long txid = fate.startTransaction();
+    transactions.add(txid);
+    assertEquals(TStatus.NEW, getTxStatus(sctx, txid));
+    fate.seedTransaction("TestOperation", txid, new 
DeferredTestRepo("testDeferredOverflow"), true,
+        "Test Op");
+    assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, txid));
+  }
+
+  protected Fate<TestEnv> initializeFate(FateStore<TestEnv> store) {
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+    return new Fate<>(new TestEnv(), store, r -> r + "", config);
+  }
+
+  protected abstract TStatus getTxStatus(ServerContext sctx, long txid);
 
   protected abstract void executeTest(FateTestExecutor testMethod) throws 
Exception;
 
+  protected abstract void executeTest(FateTestExecutor testMethod, int 
maxDeferred)
+      throws Exception;
+
   protected interface FateTestExecutor {
     void execute(FateStore<TestEnv> store, ServerContext sctx) throws 
Exception;
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java
index 8e494829d6..c71c1d1229 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java
@@ -52,12 +52,17 @@ public class AccumuloFateIT extends FateIT {
 
   @Override
   protected void executeTest(FateTestExecutor testMethod) throws Exception {
+    executeTest(testMethod, 1000);
+  }
+
+  @Override
+  protected void executeTest(FateTestExecutor testMethod, int maxDeferred) 
throws Exception {
     table = getUniqueNames(1)[0];
     try (ClientContext client =
         (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
       client.tableOperations().create(table);
 
-      final AccumuloStore<TestEnv> accumuloStore = new AccumuloStore<>(client, 
table);
+      final AccumuloStore<TestEnv> accumuloStore = new AccumuloStore<>(client, 
table, maxDeferred);
       testMethod.execute(accumuloStore, getCluster().getServerContext());
     }
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java
index 587299a0db..aee077f8c0 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java
@@ -19,12 +19,18 @@
 package org.apache.accumulo.test.fate.accumulo;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -36,6 +42,7 @@ import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.test.fate.FateIT.TestEnv;
 import org.apache.accumulo.test.fate.FateIT.TestRepo;
+import org.apache.accumulo.test.util.Wait;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -119,6 +126,89 @@ public class AccumuloStoreReadWriteIT extends 
SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testDeferredOverflow() throws Exception {
+    final String table = getUniqueNames(1)[0];
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table);
+
+      AccumuloStore<TestEnv> store = new AccumuloStore<>(client, table, 10);
+      // Verify no transactions
+      assertEquals(0, store.list().count());
+      assertFalse(store.isDeferredOverflow());
+
+      // Store 10 transactions that are all deferred
+      final Set<Long> transactions = new HashSet<>();
+      for (int i = 0; i < 10; i++) {
+        long tid = store.create();
+        transactions.add(tid);
+        FateTxStore<TestEnv> txStore = store.reserve(tid);
+        txStore.setStatus(TStatus.SUBMITTED);
+        assertTrue(txStore.timeCreated() > 0);
+        txStore.unreserve(10, TimeUnit.SECONDS);
+      }
+
+      // Verify we have 10 transactions and all are deferred
+      assertEquals(10, store.list().count());
+      assertEquals(10, store.getDeferredCount());
+
+      // Should still be false as we are at thet max but not over yet
+      assertFalse(store.isDeferredOverflow());
+
+      var executor = Executors.newCachedThreadPool();
+      AtomicBoolean keepRunning = new AtomicBoolean(true);
+      try {
+        // Run and verify all 10 transactions still exist and were not
+        // run because of the deferral time of all the transactions
+        try {
+          executor.execute(() -> store.runnable(keepRunning, 
transactions::remove));
+          Thread.sleep(2000);
+          assertEquals(10, transactions.size());
+        } finally {
+          // Should terminate the task if waiting
+          keepRunning.set(false);
+        }
+
+        // Store one more that should go over the max deferred of 10
+        // and should clear the map and set the overflow flag
+        long tid = store.create();
+        transactions.add(tid);
+        FateTxStore<TestEnv> txStore = store.reserve(tid);
+        txStore.setStatus(TStatus.SUBMITTED);
+        txStore.unreserve(30, TimeUnit.SECONDS);
+
+        // Verify we have 11 transactions stored and none
+        // deferred anymore because of the overflow
+        assertEquals(11, store.list().count());
+        assertEquals(0, store.getDeferredCount());
+        assertTrue(store.isDeferredOverflow());
+
+        // Run and verify all 11 transactions were processed
+        // and removed from the store
+        keepRunning.set(true);
+        try {
+          executor.execute(() -> store.runnable(keepRunning, 
transactions::remove));
+          Wait.waitFor(transactions::isEmpty);
+        } finally {
+          // Should terminate the task if waiting
+          keepRunning.set(false);
+        }
+
+        // Overflow should now be reset to false so adding another deferred
+        // transaction should now go back into the deferral map and flag should
+        // still be false as we are under the limit
+        assertFalse(store.isDeferredOverflow());
+        txStore = store.reserve(store.create());
+        txStore.unreserve(30, TimeUnit.SECONDS);
+        assertEquals(1, store.getDeferredCount());
+        assertFalse(store.isDeferredOverflow());
+      } finally {
+        executor.shutdownNow();
+      }
+    }
+  }
+
   private static class TestOperation2 extends TestRepo {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
index c1948089b7..175785270d 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
@@ -65,7 +65,12 @@ public class ZookeeperFateIT extends FateIT {
 
   @Override
   protected void executeTest(FateTestExecutor testMethod) throws Exception {
-    final ZooStore<TestEnv> zooStore = new ZooStore<>(ZK_ROOT + 
Constants.ZFATE, zk);
+    executeTest(testMethod, 1000);
+  }
+
+  @Override
+  protected void executeTest(FateTestExecutor testMethod, int maxDeferred) 
throws Exception {
+    final ZooStore<TestEnv> zooStore = new ZooStore<>(ZK_ROOT + 
Constants.ZFATE, zk, maxDeferred);
 
     ServerContext sctx = createMock(ServerContext.class);
     expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
@@ -76,9 +81,12 @@ public class ZookeeperFateIT extends FateIT {
   }
 
   @Override
-  protected TStatus getTxStatus(ServerContext sctx, long txid)
-      throws InterruptedException, KeeperException {
-    return getTxStatus(sctx.getZooReaderWriter(), txid);
+  protected TStatus getTxStatus(ServerContext sctx, long txid) {
+    try {
+      return getTxStatus(sctx.getZooReaderWriter(), txid);
+    } catch (KeeperException | InterruptedException e) {
+      throw new IllegalStateException(e);
+    }
   }
 
   /*

Reply via email to