This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new ee81a3c30d3 Adds some missing FATE functionality (#5263)
ee81a3c30d3 is described below

commit ee81a3c30d3a34604c33c042abb631db5952388b
Author: Kevin Rathbun <krath...@apache.org>
AuthorDate: Tue Jan 21 10:09:00 2025 -0500

    Adds some missing FATE functionality (#5263)
    
    - Previously, if a user configured fewer FATE threads to work on 
transactions (via MANAGER_FATE_THREADPOOL_SIZE property), the pool size would 
not actually be decreased. These changes safely stop excess workers in the case 
where the property value is decreased.
    - Added test FatePoolResizeIT (tests both META and USER transactions: 
MetaFatePoolResizeIT and UserFatePoolResizeIT) to ensure the pool size is 
correctly increased and decreased with configuration changes.
    - Fate.shutdown() was not waiting on termination of the fate pool watcher 
when needed. Added the missing wait.
    
    ---------
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../java/org/apache/accumulo/core/fate/Fate.java   | 150 ++++++++++++------
 .../org/apache/accumulo/test/fate/FastFate.java    |  14 +-
 .../accumulo/test/fate/FatePoolResizeIT.java       | 167 +++++++++++++++++++++
 .../test/fate/meta/MetaFatePoolResizeIT.java       |  66 ++++++++
 .../test/fate/user/UserFatePoolResizeIT.java       |  58 +++++++
 5 files changed, 402 insertions(+), 53 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index ac579176293..566dc5e9eed 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -32,8 +32,11 @@ import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
 import static org.apache.accumulo.core.util.ShutdownUtil.isIOException;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedTransferQueue;
@@ -64,6 +67,9 @@ import org.apache.thrift.TApplicationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * Fault tolerant executor
  */
@@ -76,6 +82,7 @@ public class Fate<T> {
   private final T environment;
   private final ScheduledThreadPoolExecutor fatePoolWatcher;
   private final ExecutorService transactionExecutor;
+  private final Set<TransactionRunner> runningTxRunners;
   private final ExecutorService deadResCleanerExecutor;
 
   private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
@@ -179,9 +186,12 @@ public class Fate<T> {
   }
 
   private class TransactionRunner implements Runnable {
+    // used to signal a TransactionRunner to stop in the case where there are 
too many running
+    // i.e., the property for the pool size decreased and we have excess 
TransactionRunners
+    private final AtomicBoolean stop = new AtomicBoolean(false);
 
     private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
-      while (keepRunning.get()) {
+      while (keepRunning.get() && !stop.get()) {
         FateId unreservedFateId = workQueue.poll(100, MILLISECONDS);
 
         if (unreservedFateId == null) {
@@ -198,55 +208,61 @@ public class Fate<T> {
 
     @Override
     public void run() {
-      while (keepRunning.get()) {
-        FateTxStore<T> txStore = null;
-        ExecutionState state = new ExecutionState();
-        try {
-          var optionalopStore = reserveFateTx();
-          if (optionalopStore.isPresent()) {
-            txStore = optionalopStore.orElseThrow();
-          } else {
-            continue;
-          }
-          state.status = txStore.getStatus();
-          state.op = txStore.top();
-          if (state.status == FAILED_IN_PROGRESS) {
-            processFailed(txStore, state.op);
-          } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) 
{
-            try {
-              execute(txStore, state);
-              if (state.op != null && state.deferTime != 0) {
-                // The current op is not ready to execute
-                continue;
-              }
-            } catch (StackOverflowException e) {
-              // the op that failed to push onto the stack was never executed, 
so no need to undo
-              // it just transition to failed and undo the ops that executed
-              transitionToFailed(txStore, e);
-              continue;
-            } catch (Exception e) {
-              blockIfHadoopShutdown(txStore.getID(), e);
-              transitionToFailed(txStore, e);
+      runningTxRunners.add(this);
+      try {
+        while (keepRunning.get() && !stop.get()) {
+          FateTxStore<T> txStore = null;
+          ExecutionState state = new ExecutionState();
+          try {
+            var optionalopStore = reserveFateTx();
+            if (optionalopStore.isPresent()) {
+              txStore = optionalopStore.orElseThrow();
+            } else {
               continue;
             }
+            state.status = txStore.getStatus();
+            state.op = txStore.top();
+            if (state.status == FAILED_IN_PROGRESS) {
+              processFailed(txStore, state.op);
+            } else if (state.status == SUBMITTED || state.status == 
IN_PROGRESS) {
+              try {
+                execute(txStore, state);
+                if (state.op != null && state.deferTime != 0) {
+                  // The current op is not ready to execute
+                  continue;
+                }
+              } catch (StackOverflowException e) {
+                // the op that failed to push onto the stack was never 
executed, so no need to undo
+                // it just transition to failed and undo the ops that executed
+                transitionToFailed(txStore, e);
+                continue;
+              } catch (Exception e) {
+                blockIfHadoopShutdown(txStore.getID(), e);
+                transitionToFailed(txStore, e);
+                continue;
+              }
 
-            if (state.op == null) {
-              // transaction is finished
-              String ret = state.prevOp.getReturn();
-              if (ret != null) {
-                txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+              if (state.op == null) {
+                // transaction is finished
+                String ret = state.prevOp.getReturn();
+                if (ret != null) {
+                  txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+                }
+                txStore.setStatus(SUCCESSFUL);
+                doCleanUp(txStore);
               }
-              txStore.setStatus(SUCCESSFUL);
-              doCleanUp(txStore);
             }
-          }
-        } catch (Exception e) {
-          runnerLog.error("Uncaught exception in FATE runner thread.", e);
-        } finally {
-          if (txStore != null) {
-            txStore.unreserve(Duration.ofMillis(state.deferTime));
+          } catch (Exception e) {
+            runnerLog.error("Uncaught exception in FATE runner thread.", e);
+          } finally {
+            if (txStore != null) {
+              txStore.unreserve(Duration.ofMillis(state.deferTime));
+            }
           }
         }
+      } finally {
+        log.trace("A TransactionRunner is exiting...");
+        Preconditions.checkState(runningTxRunners.remove(this));
       }
     }
 
@@ -357,6 +373,14 @@ public class Fate<T> {
       }
     }
 
+    private boolean flagStop() {
+      return stop.compareAndSet(false, true);
+    }
+
+    private boolean isFlaggedToStop() {
+      return stop.get();
+    }
+
   }
 
   protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception {
@@ -400,15 +424,16 @@ public class Fate<T> {
     final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
         Property.MANAGER_FATE_THREADPOOL_SIZE, true);
     this.workQueue = new LinkedTransferQueue<>();
+    this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
     this.fatePoolWatcher =
         
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
     
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
 -> {
       // resize the pool if the property changed
       ThreadPools.resizePool(pool, conf, 
Property.MANAGER_FATE_THREADPOOL_SIZE);
-      // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
       final int configured = 
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
-      final int needed = configured - pool.getActiveCount();
+      final int needed = configured - runningTxRunners.size();
       if (needed > 0) {
+        // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
         for (int i = 0; i < needed; i++) {
           try {
             pool.execute(new TransactionRunner());
@@ -425,11 +450,26 @@ public class Fate<T> {
           }
         }
         idleCountHistory.clear();
+      } else if (needed < 0) {
+        // If we need the pool to shrink, then ensure excess 
TransactionRunners are safely stopped.
+        // Flag the necessary number of TransactionRunners to safely stop when 
they are done work
+        // on a transaction.
+        int numFlagged =
+            (int) 
runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count();
+        int numToStop = -1 * (numFlagged + needed);
+        for (var runner : runningTxRunners) {
+          if (numToStop <= 0) {
+            break;
+          }
+          if (runner.flagStop()) {
+            log.trace("Flagging a TransactionRunner to stop...");
+            numToStop--;
+          }
+        }
       } else {
         // The property did not change, but should it based on idle Fate 
threads? Maintain
         // count of the last X minutes of idle Fate threads. If zero 95% of 
the time, then suggest
-        // that the
-        // MANAGER_FATE_THREADPOOL_SIZE be increased.
+        // that the MANAGER_FATE_THREADPOOL_SIZE be increased.
         final long interval = Math.min(60, TimeUnit.MILLISECONDS
             
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
         if (interval == 0) {
@@ -488,6 +528,11 @@ public class Fate<T> {
     return POOL_WATCHER_DELAY;
   }
 
+  @VisibleForTesting
+  public int getTxRunnersActive() {
+    return runningTxRunners.size();
+  }
+
   // get a transaction id back to the requester before doing any work
   public FateId startTransaction() {
     return store.create();
@@ -622,10 +667,15 @@ public class Fate<T> {
     if (timeout > 0) {
       long start = System.nanoTime();
 
-      while ((System.nanoTime() - start) < timeUnit.toNanos(timeout)
-          && (workFinder.isAlive() || !transactionExecutor.isTerminated()
-              || (deadResCleanerExecutor != null && 
!deadResCleanerExecutor.isTerminated()))) {
+      while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) && 
(workFinder.isAlive()
+          || !transactionExecutor.isTerminated() || 
!fatePoolWatcher.isTerminated()
+          || (deadResCleanerExecutor != null && 
!deadResCleanerExecutor.isTerminated()))) {
         try {
+          if (!fatePoolWatcher.awaitTermination(1, SECONDS)) {
+            log.debug("Fate {} is waiting for pool watcher to terminate", 
store.type());
+            continue;
+          }
+
           if (!transactionExecutor.awaitTermination(1, SECONDS)) {
             log.debug("Fate {} is waiting for worker threads to terminate", 
store.type());
             continue;
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
index f15fb6caaa1..e33906f29c1 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
@@ -27,10 +27,13 @@ import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.Repo;
 
 /**
- * A FATE which performs the dead reservation cleanup with a much shorter 
delay between. Useful for
- * shortening test times for tests that are waiting for a cleanup to occur.
+ * A FATE which performs the dead reservation cleanup and the check on the 
pool size with a much
+ * shorter delay between. Useful for shortening test times for tests that are 
waiting for one of
+ * these actions to occur.
  */
 public class FastFate<T> extends Fate<T> {
+  private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofSeconds(5);
+  private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(5);
 
   public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner,
       Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
@@ -39,6 +42,11 @@ public class FastFate<T> extends Fate<T> {
 
   @Override
   public Duration getDeadResCleanupDelay() {
-    return Duration.ofSeconds(5);
+    return DEAD_RES_CLEANUP_DELAY;
+  }
+
+  @Override
+  public Duration getPoolWatcherDelay() {
+    return POOL_WATCHER_DELAY;
   }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolResizeIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolResizeIT.java
new file mode 100644
index 00000000000..7b429fdde02
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolResizeIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.util.Wait;
+import org.junit.jupiter.api.Test;
+
+public abstract class FatePoolResizeIT extends SharedMiniClusterBase
+    implements FateTestRunner<FatePoolResizeIT.PoolResizeTestEnv> {
+
+  @Test
+  public void testIncreaseSize() throws Exception {
+    executeTest(this::testIncreaseSize);
+  }
+
+  protected void testIncreaseSize(FateStore<PoolResizeTestEnv> store, 
ServerContext sctx)
+      throws Exception {
+    final int numThreads = 10;
+    final int newNumThreads = 20;
+    final ConfigurationCopy config = initConfig(numThreads);
+    final var env = new PoolResizeTestEnv();
+    final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r 
-> r + "", config);
+    try {
+      // create a tx for all future threads. For now, only some of the txns 
will be workable
+      for (int i = 0; i < newNumThreads; i++) {
+        var fateId = fate.startTransaction();
+        fate.seedTransaction(TEST_FATE_OP, fateId, new PoolResizeTestRepo(), 
true, "testing");
+      }
+      // wait for all available threads to be working on a tx
+      Wait.waitFor(() -> env.numWorkers.get() == numThreads);
+      assertEquals(numThreads, fate.getTxRunnersActive());
+      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, 
String.valueOf(newNumThreads));
+      // wait for new config to be detected, new runners to be created, and 
for these runners to
+      // pick up the rest of the available txns
+      Wait.waitFor(() -> env.numWorkers.get() == newNumThreads);
+      assertEquals(newNumThreads, fate.getTxRunnersActive());
+      // finish work
+      env.isReadyLatch.countDown();
+      Wait.waitFor(() -> env.numWorkers.get() == 0);
+      // workers should still be running since we haven't shutdown, just not 
working on anything
+      assertEquals(newNumThreads, fate.getTxRunnersActive());
+    } finally {
+      fate.shutdown(1, TimeUnit.MINUTES);
+      assertEquals(0, fate.getTxRunnersActive());
+    }
+  }
+
+  @Test
+  public void testDecreaseSize() throws Exception {
+    executeTest(this::testDecreaseSize);
+  }
+
+  protected void testDecreaseSize(FateStore<PoolResizeTestEnv> store, 
ServerContext sctx)
+      throws Exception {
+    final int numThreads = 20;
+    final int newNumThreads = 10;
+    final ConfigurationCopy config = initConfig(numThreads);
+    final var env = new PoolResizeTestEnv();
+    final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r 
-> r + "", config);
+    try {
+      // create a tx for each thread
+      for (int i = 0; i < numThreads; i++) {
+        var fateId = fate.startTransaction();
+        fate.seedTransaction(TEST_FATE_OP, fateId, new PoolResizeTestRepo(), 
true, "testing");
+      }
+      // wait for all threads to be working on a tx
+      Wait.waitFor(() -> env.numWorkers.get() == numThreads);
+      assertEquals(numThreads, fate.getTxRunnersActive());
+      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, 
String.valueOf(newNumThreads));
+      // ensure another execution of the pool watcher task occurs after we 
change the size
+      Thread.sleep(fate.getPoolWatcherDelay().toMillis() * 2);
+      // at this point, FATE should detect that there are more transaction 
runners running than
+      // configured. None can be safely stopped yet, (still in progress - 
haven't passed isReady).
+      // We ensure none have been unexpectedly stopped, then we allow isReady 
to pass and the txns
+      // to complete
+      assertEquals(numThreads, env.numWorkers.get());
+      env.isReadyLatch.countDown();
+      // wait for the pool size to be decreased to the expected value
+      Wait.waitFor(() -> fate.getTxRunnersActive() == newNumThreads);
+      // wait for all threads to have completed their tx
+      Wait.waitFor(() -> env.numWorkers.get() == 0);
+      // wait a bit longer to ensure no more workers than expected were stopped
+      Thread.sleep(5_000);
+      assertEquals(newNumThreads, fate.getTxRunnersActive());
+    } finally {
+      fate.shutdown(1, TimeUnit.MINUTES);
+      assertEquals(0, fate.getTxRunnersActive());
+    }
+  }
+
+  private ConfigurationCopy initConfig(int numThreads) {
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, 
String.valueOf(numThreads));
+    config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "60m");
+    return config;
+  }
+
+  public static class PoolResizeTestRepo implements Repo<PoolResizeTestEnv> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public long isReady(FateId fateId, PoolResizeTestEnv environment) throws 
Exception {
+      environment.numWorkers.incrementAndGet();
+      environment.isReadyLatch.await();
+      return 0;
+    }
+
+    @Override
+    public String getName() {
+      return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public Repo<PoolResizeTestEnv> call(FateId fateId, PoolResizeTestEnv 
environment)
+        throws Exception {
+      environment.numWorkers.decrementAndGet();
+      return null;
+    }
+
+    @Override
+    public void undo(FateId fateId, PoolResizeTestEnv environment) throws 
Exception {
+
+    }
+
+    @Override
+    public String getReturn() {
+      return null;
+    }
+  }
+
+  public static class PoolResizeTestEnv extends TestEnv {
+    private final AtomicInteger numWorkers = new AtomicInteger(0);
+    private final CountDownLatch isReadyLatch = new CountDownLatch(1);
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolResizeIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolResizeIT.java
new file mode 100644
index 00000000000..5ae5242a7c7
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolResizeIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.meta;
+
+import static org.apache.accumulo.test.fate.TestLock.createDummyLockID;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.io.File;
+
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.fate.FatePoolResizeIT;
+import org.apache.accumulo.test.fate.FateStoreUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MetaFatePoolResizeIT extends FatePoolResizeIT {
+  @TempDir
+  private static File tempDir;
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    FateStoreUtil.MetaFateZKSetup.setup(tempDir);
+  }
+
+  @AfterAll
+  public static void teardown() throws Exception {
+    FateStoreUtil.MetaFateZKSetup.teardown();
+  }
+
+  @Override
+  public void executeTest(FateTestExecutor<PoolResizeTestEnv> testMethod, int 
maxDeferred,
+      AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
+    String zkRoot = FateStoreUtil.MetaFateZKSetup.getZkRoot();
+    var zk = FateStoreUtil.MetaFateZKSetup.getZk();
+    String fatePath = FateStoreUtil.MetaFateZKSetup.getZkFatePath();
+    ServerContext sctx = createMock(ServerContext.class);
+    expect(sctx.getZooKeeperRoot()).andReturn(zkRoot).anyTimes();
+    expect(sctx.getZooSession()).andReturn(zk).anyTimes();
+    replay(sctx);
+
+    testMethod.execute(
+        new MetaFateStore<>(fatePath, zk, createDummyLockID(), null, 
maxDeferred, fateIdGenerator),
+        sctx);
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolResizeIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolResizeIT.java
new file mode 100644
index 00000000000..77ae75cd63b
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolResizeIT.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.user;
+
+import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable;
+import static org.apache.accumulo.test.fate.TestLock.createDummyLockID;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.fate.FatePoolResizeIT;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+public class UserFatePoolResizeIT extends FatePoolResizeIT {
+
+  private String table;
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Override
+  public void executeTest(FateTestExecutor<PoolResizeTestEnv> testMethod, int 
maxDeferred,
+      AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
+    table = getUniqueNames(1)[0];
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      createFateTable(client, table);
+      testMethod.execute(new UserFateStore<>(client, table, 
createDummyLockID(), null, maxDeferred,
+          fateIdGenerator), getCluster().getServerContext());
+    }
+  }
+}

Reply via email to