This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new ee81a3c30d3 Adds some missing FATE functionality (#5263)
ee81a3c30d3 is described below
commit ee81a3c30d3a34604c33c042abb631db5952388b
Author: Kevin Rathbun <[email protected]>
AuthorDate: Tue Jan 21 10:09:00 2025 -0500
Adds some missing FATE functionality (#5263)
- Previously, if a user configured fewer FATE threads to work on
transactions (via MANAGER_FATE_THREADPOOL_SIZE property), the pool size would
not actually be decreased. These changes safely stop excess workers in the case
where the property value is decreased.
- Added test FatePoolResizeIT (tests both META and USER transactions:
MetaFatePoolResizeIT and UserFatePoolResizeIT) to ensure the pool size is
correctly increased and decreased with configuration changes.
- Fate.shutdown() was not waiting on termination of the fate pool watcher
when needed. Added the missing wait.
---------
Co-authored-by: Keith Turner <[email protected]>
---
.../java/org/apache/accumulo/core/fate/Fate.java | 150 ++++++++++++------
.../org/apache/accumulo/test/fate/FastFate.java | 14 +-
.../accumulo/test/fate/FatePoolResizeIT.java | 167 +++++++++++++++++++++
.../test/fate/meta/MetaFatePoolResizeIT.java | 66 ++++++++
.../test/fate/user/UserFatePoolResizeIT.java | 58 +++++++
5 files changed, 402 insertions(+), 53 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index ac579176293..566dc5e9eed 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -32,8 +32,11 @@ import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
import static org.apache.accumulo.core.util.ShutdownUtil.isIOException;
import java.time.Duration;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
@@ -64,6 +67,9 @@ import org.apache.thrift.TApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
/**
* Fault tolerant executor
*/
@@ -76,6 +82,7 @@ public class Fate<T> {
private final T environment;
private final ScheduledThreadPoolExecutor fatePoolWatcher;
private final ExecutorService transactionExecutor;
+ private final Set<TransactionRunner> runningTxRunners;
private final ExecutorService deadResCleanerExecutor;
private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED,
SUCCESSFUL, UNKNOWN);
@@ -179,9 +186,12 @@ public class Fate<T> {
}
private class TransactionRunner implements Runnable {
+ // used to signal a TransactionRunner to stop in the case where there are
too many running
+ // i.e., the property for the pool size decreased and we have excess
TransactionRunners
+ private final AtomicBoolean stop = new AtomicBoolean(false);
private Optional<FateTxStore<T>> reserveFateTx() throws
InterruptedException {
- while (keepRunning.get()) {
+ while (keepRunning.get() && !stop.get()) {
FateId unreservedFateId = workQueue.poll(100, MILLISECONDS);
if (unreservedFateId == null) {
@@ -198,55 +208,61 @@ public class Fate<T> {
@Override
public void run() {
- while (keepRunning.get()) {
- FateTxStore<T> txStore = null;
- ExecutionState state = new ExecutionState();
- try {
- var optionalopStore = reserveFateTx();
- if (optionalopStore.isPresent()) {
- txStore = optionalopStore.orElseThrow();
- } else {
- continue;
- }
- state.status = txStore.getStatus();
- state.op = txStore.top();
- if (state.status == FAILED_IN_PROGRESS) {
- processFailed(txStore, state.op);
- } else if (state.status == SUBMITTED || state.status == IN_PROGRESS)
{
- try {
- execute(txStore, state);
- if (state.op != null && state.deferTime != 0) {
- // The current op is not ready to execute
- continue;
- }
- } catch (StackOverflowException e) {
- // the op that failed to push onto the stack was never executed,
so no need to undo
- // it just transition to failed and undo the ops that executed
- transitionToFailed(txStore, e);
- continue;
- } catch (Exception e) {
- blockIfHadoopShutdown(txStore.getID(), e);
- transitionToFailed(txStore, e);
+ runningTxRunners.add(this);
+ try {
+ while (keepRunning.get() && !stop.get()) {
+ FateTxStore<T> txStore = null;
+ ExecutionState state = new ExecutionState();
+ try {
+ var optionalopStore = reserveFateTx();
+ if (optionalopStore.isPresent()) {
+ txStore = optionalopStore.orElseThrow();
+ } else {
continue;
}
+ state.status = txStore.getStatus();
+ state.op = txStore.top();
+ if (state.status == FAILED_IN_PROGRESS) {
+ processFailed(txStore, state.op);
+ } else if (state.status == SUBMITTED || state.status ==
IN_PROGRESS) {
+ try {
+ execute(txStore, state);
+ if (state.op != null && state.deferTime != 0) {
+ // The current op is not ready to execute
+ continue;
+ }
+ } catch (StackOverflowException e) {
+ // the op that failed to push onto the stack was never
executed, so no need to undo
+ // it just transition to failed and undo the ops that executed
+ transitionToFailed(txStore, e);
+ continue;
+ } catch (Exception e) {
+ blockIfHadoopShutdown(txStore.getID(), e);
+ transitionToFailed(txStore, e);
+ continue;
+ }
- if (state.op == null) {
- // transaction is finished
- String ret = state.prevOp.getReturn();
- if (ret != null) {
- txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+ if (state.op == null) {
+ // transaction is finished
+ String ret = state.prevOp.getReturn();
+ if (ret != null) {
+ txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
+ }
+ txStore.setStatus(SUCCESSFUL);
+ doCleanUp(txStore);
}
- txStore.setStatus(SUCCESSFUL);
- doCleanUp(txStore);
}
- }
- } catch (Exception e) {
- runnerLog.error("Uncaught exception in FATE runner thread.", e);
- } finally {
- if (txStore != null) {
- txStore.unreserve(Duration.ofMillis(state.deferTime));
+ } catch (Exception e) {
+ runnerLog.error("Uncaught exception in FATE runner thread.", e);
+ } finally {
+ if (txStore != null) {
+ txStore.unreserve(Duration.ofMillis(state.deferTime));
+ }
}
}
+ } finally {
+ log.trace("A TransactionRunner is exiting...");
+ Preconditions.checkState(runningTxRunners.remove(this));
}
}
@@ -357,6 +373,14 @@ public class Fate<T> {
}
}
+ private boolean flagStop() {
+ return stop.compareAndSet(false, true);
+ }
+
+ private boolean isFlaggedToStop() {
+ return stop.get();
+ }
+
}
protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception {
@@ -400,15 +424,16 @@ public class Fate<T> {
final ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
this.workQueue = new LinkedTransferQueue<>();
+ this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
this.fatePoolWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
-> {
// resize the pool if the property changed
ThreadPools.resizePool(pool, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
- // If the pool grew, then ensure that there is a TransactionRunner for
each thread
final int configured =
conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
- final int needed = configured - pool.getActiveCount();
+ final int needed = configured - runningTxRunners.size();
if (needed > 0) {
+ // If the pool grew, then ensure that there is a TransactionRunner for
each thread
for (int i = 0; i < needed; i++) {
try {
pool.execute(new TransactionRunner());
@@ -425,11 +450,26 @@ public class Fate<T> {
}
}
idleCountHistory.clear();
+ } else if (needed < 0) {
+ // If we need the pool to shrink, then ensure excess
TransactionRunners are safely stopped.
+ // Flag the necessary number of TransactionRunners to safely stop when
they are done work
+ // on a transaction.
+ int numFlagged =
+ (int)
runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count();
+ int numToStop = -1 * (numFlagged + needed);
+ for (var runner : runningTxRunners) {
+ if (numToStop <= 0) {
+ break;
+ }
+ if (runner.flagStop()) {
+ log.trace("Flagging a TransactionRunner to stop...");
+ numToStop--;
+ }
+ }
} else {
// The property did not change, but should it based on idle Fate
threads? Maintain
// count of the last X minutes of idle Fate threads. If zero 95% of
the time, then suggest
- // that the
- // MANAGER_FATE_THREADPOOL_SIZE be increased.
+ // that the MANAGER_FATE_THREADPOOL_SIZE be increased.
final long interval = Math.min(60, TimeUnit.MILLISECONDS
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
if (interval == 0) {
@@ -488,6 +528,11 @@ public class Fate<T> {
return POOL_WATCHER_DELAY;
}
+ @VisibleForTesting
+ public int getTxRunnersActive() {
+ return runningTxRunners.size();
+ }
+
// get a transaction id back to the requester before doing any work
public FateId startTransaction() {
return store.create();
@@ -622,10 +667,15 @@ public class Fate<T> {
if (timeout > 0) {
long start = System.nanoTime();
- while ((System.nanoTime() - start) < timeUnit.toNanos(timeout)
- && (workFinder.isAlive() || !transactionExecutor.isTerminated()
- || (deadResCleanerExecutor != null &&
!deadResCleanerExecutor.isTerminated()))) {
+ while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) &&
(workFinder.isAlive()
+ || !transactionExecutor.isTerminated() ||
!fatePoolWatcher.isTerminated()
+ || (deadResCleanerExecutor != null &&
!deadResCleanerExecutor.isTerminated()))) {
try {
+ if (!fatePoolWatcher.awaitTermination(1, SECONDS)) {
+ log.debug("Fate {} is waiting for pool watcher to terminate",
store.type());
+ continue;
+ }
+
if (!transactionExecutor.awaitTermination(1, SECONDS)) {
log.debug("Fate {} is waiting for worker threads to terminate",
store.type());
continue;
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
index f15fb6caaa1..e33906f29c1 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
@@ -27,10 +27,13 @@ import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.Repo;
/**
- * A FATE which performs the dead reservation cleanup with a much shorter
delay between. Useful for
- * shortening test times for tests that are waiting for a cleanup to occur.
+ * A FATE which performs the dead reservation cleanup and the check on the
pool size with a much
+ * shorter delay between. Useful for shortening test times for tests that are
waiting for one of
+ * these actions to occur.
*/
public class FastFate<T> extends Fate<T> {
+ private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofSeconds(5);
+ private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(5);
public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
@@ -39,6 +42,11 @@ public class FastFate<T> extends Fate<T> {
@Override
public Duration getDeadResCleanupDelay() {
- return Duration.ofSeconds(5);
+ return DEAD_RES_CLEANUP_DELAY;
+ }
+
+ @Override
+ public Duration getPoolWatcherDelay() {
+ return POOL_WATCHER_DELAY;
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolResizeIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolResizeIT.java
new file mode 100644
index 00000000000..7b429fdde02
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolResizeIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.util.Wait;
+import org.junit.jupiter.api.Test;
+
+public abstract class FatePoolResizeIT extends SharedMiniClusterBase
+ implements FateTestRunner<FatePoolResizeIT.PoolResizeTestEnv> {
+
+ @Test
+ public void testIncreaseSize() throws Exception {
+ executeTest(this::testIncreaseSize);
+ }
+
+ protected void testIncreaseSize(FateStore<PoolResizeTestEnv> store,
ServerContext sctx)
+ throws Exception {
+ final int numThreads = 10;
+ final int newNumThreads = 20;
+ final ConfigurationCopy config = initConfig(numThreads);
+ final var env = new PoolResizeTestEnv();
+ final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r
-> r + "", config);
+ try {
+ // create a tx for all future threads. For now, only some of the txns
will be workable
+ for (int i = 0; i < newNumThreads; i++) {
+ var fateId = fate.startTransaction();
+ fate.seedTransaction(TEST_FATE_OP, fateId, new PoolResizeTestRepo(),
true, "testing");
+ }
+ // wait for all available threads to be working on a tx
+ Wait.waitFor(() -> env.numWorkers.get() == numThreads);
+ assertEquals(numThreads, fate.getTxRunnersActive());
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE,
String.valueOf(newNumThreads));
+ // wait for new config to be detected, new runners to be created, and
for these runners to
+ // pick up the rest of the available txns
+ Wait.waitFor(() -> env.numWorkers.get() == newNumThreads);
+ assertEquals(newNumThreads, fate.getTxRunnersActive());
+ // finish work
+ env.isReadyLatch.countDown();
+ Wait.waitFor(() -> env.numWorkers.get() == 0);
+ // workers should still be running since we haven't shutdown, just not
working on anything
+ assertEquals(newNumThreads, fate.getTxRunnersActive());
+ } finally {
+ fate.shutdown(1, TimeUnit.MINUTES);
+ assertEquals(0, fate.getTxRunnersActive());
+ }
+ }
+
+ @Test
+ public void testDecreaseSize() throws Exception {
+ executeTest(this::testDecreaseSize);
+ }
+
+ protected void testDecreaseSize(FateStore<PoolResizeTestEnv> store,
ServerContext sctx)
+ throws Exception {
+ final int numThreads = 20;
+ final int newNumThreads = 10;
+ final ConfigurationCopy config = initConfig(numThreads);
+ final var env = new PoolResizeTestEnv();
+ final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r
-> r + "", config);
+ try {
+ // create a tx for each thread
+ for (int i = 0; i < numThreads; i++) {
+ var fateId = fate.startTransaction();
+ fate.seedTransaction(TEST_FATE_OP, fateId, new PoolResizeTestRepo(),
true, "testing");
+ }
+ // wait for all threads to be working on a tx
+ Wait.waitFor(() -> env.numWorkers.get() == numThreads);
+ assertEquals(numThreads, fate.getTxRunnersActive());
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE,
String.valueOf(newNumThreads));
+ // ensure another execution of the pool watcher task occurs after we
change the size
+ Thread.sleep(fate.getPoolWatcherDelay().toMillis() * 2);
+ // at this point, FATE should detect that there are more transaction
runners running than
+ // configured. None can be safely stopped yet, (still in progress -
haven't passed isReady).
+ // We ensure none have been unexpectedly stopped, then we allow isReady
to pass and the txns
+ // to complete
+ assertEquals(numThreads, env.numWorkers.get());
+ env.isReadyLatch.countDown();
+ // wait for the pool size to be decreased to the expected value
+ Wait.waitFor(() -> fate.getTxRunnersActive() == newNumThreads);
+ // wait for all threads to have completed their tx
+ Wait.waitFor(() -> env.numWorkers.get() == 0);
+ // wait a bit longer to ensure no more workers than expected were stopped
+ Thread.sleep(5_000);
+ assertEquals(newNumThreads, fate.getTxRunnersActive());
+ } finally {
+ fate.shutdown(1, TimeUnit.MINUTES);
+ assertEquals(0, fate.getTxRunnersActive());
+ }
+ }
+
+ private ConfigurationCopy initConfig(int numThreads) {
+ ConfigurationCopy config = new ConfigurationCopy();
+ config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE,
String.valueOf(numThreads));
+ config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "60m");
+ return config;
+ }
+
+ public static class PoolResizeTestRepo implements Repo<PoolResizeTestEnv> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long isReady(FateId fateId, PoolResizeTestEnv environment) throws
Exception {
+ environment.numWorkers.incrementAndGet();
+ environment.isReadyLatch.await();
+ return 0;
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public Repo<PoolResizeTestEnv> call(FateId fateId, PoolResizeTestEnv
environment)
+ throws Exception {
+ environment.numWorkers.decrementAndGet();
+ return null;
+ }
+
+ @Override
+ public void undo(FateId fateId, PoolResizeTestEnv environment) throws
Exception {
+
+ }
+
+ @Override
+ public String getReturn() {
+ return null;
+ }
+ }
+
+ public static class PoolResizeTestEnv extends TestEnv {
+ private final AtomicInteger numWorkers = new AtomicInteger(0);
+ private final CountDownLatch isReadyLatch = new CountDownLatch(1);
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolResizeIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolResizeIT.java
new file mode 100644
index 00000000000..5ae5242a7c7
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolResizeIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.meta;
+
+import static org.apache.accumulo.test.fate.TestLock.createDummyLockID;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.io.File;
+
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.fate.FatePoolResizeIT;
+import org.apache.accumulo.test.fate.FateStoreUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MetaFatePoolResizeIT extends FatePoolResizeIT {
+ @TempDir
+ private static File tempDir;
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ FateStoreUtil.MetaFateZKSetup.setup(tempDir);
+ }
+
+ @AfterAll
+ public static void teardown() throws Exception {
+ FateStoreUtil.MetaFateZKSetup.teardown();
+ }
+
+ @Override
+ public void executeTest(FateTestExecutor<PoolResizeTestEnv> testMethod, int
maxDeferred,
+ AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
+ String zkRoot = FateStoreUtil.MetaFateZKSetup.getZkRoot();
+ var zk = FateStoreUtil.MetaFateZKSetup.getZk();
+ String fatePath = FateStoreUtil.MetaFateZKSetup.getZkFatePath();
+ ServerContext sctx = createMock(ServerContext.class);
+ expect(sctx.getZooKeeperRoot()).andReturn(zkRoot).anyTimes();
+ expect(sctx.getZooSession()).andReturn(zk).anyTimes();
+ replay(sctx);
+
+ testMethod.execute(
+ new MetaFateStore<>(fatePath, zk, createDummyLockID(), null,
maxDeferred, fateIdGenerator),
+ sctx);
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolResizeIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolResizeIT.java
new file mode 100644
index 00000000000..77ae75cd63b
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolResizeIT.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.user;
+
+import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable;
+import static org.apache.accumulo.test.fate.TestLock.createDummyLockID;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.fate.FatePoolResizeIT;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+public class UserFatePoolResizeIT extends FatePoolResizeIT {
+
+ private String table;
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ SharedMiniClusterBase.startMiniCluster();
+ }
+
+ @AfterAll
+ public static void teardown() {
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ @Override
+ public void executeTest(FateTestExecutor<PoolResizeTestEnv> testMethod, int
maxDeferred,
+ AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
+ table = getUniqueNames(1)[0];
+ try (ClientContext client =
+ (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+ createFateTable(client, table);
+ testMethod.execute(new UserFateStore<>(client, table,
createDummyLockID(), null, maxDeferred,
+ fateIdGenerator), getCluster().getServerContext());
+ }
+ }
+}