This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new bfaa80f0c1 Single conditional writer for UserFateStore (#5670) bfaa80f0c1 is described below commit bfaa80f0c1b19e14f9a942918c32caf2f42711d2 Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Tue Jun 24 16:35:46 2025 -0400 Single conditional writer for UserFateStore (#5670) * Single conditional writer for UserFateStore Avoids creating multiple conditional writers for UserFateStore. * FateStore now extends AutoCloseable * MetaFateStore.close() is no-op * UserFateStore.close() closes the conditional writer * All tests using a FateStore now call close() * The real fate stores are closed on the shutdown of fate * UserFateStore now only creates a single conditional writer * Num threads for this conditional writer set by a new prop MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX * writer in UserFateStore is a memoized Supplier so it's not created when using a ReadOnlyFateStore. closes #5660 --- .../org/apache/accumulo/core/conf/Property.java | 4 + .../java/org/apache/accumulo/core/fate/Fate.java | 3 + .../org/apache/accumulo/core/fate/FateStore.java | 4 +- .../accumulo/core/fate/user/FateMutatorImpl.java | 12 +- .../accumulo/core/fate/user/UserFateStore.java | 33 +- .../core/fate/zookeeper/MetaFateStore.java | 5 + .../apache/accumulo/core/logging/FateLogger.java | 5 + .../org/apache/accumulo/core/fate/TestStore.java | 5 + .../org/apache/accumulo/server/util/Admin.java | 63 +++- .../util/checkCommand/TableLocksCheckRunner.java | 77 ++-- .../test/compaction/ExternalCompaction_1_IT.java | 28 +- .../accumulo/test/fate/MultipleStoresITBase.java | 405 +++++++++++---------- .../meta/MetaFateExecutionOrderIT_SimpleSuite.java | 13 +- .../apache/accumulo/test/fate/meta/MetaFateIT.java | 7 +- .../test/fate/meta/MetaFateOpsCommandsIT.java | 8 +- .../test/fate/meta/MetaFatePoolsWatcherIT.java | 7 +- .../fate/meta/MetaFateStatusEnforcementIT.java | 6 + .../test/fate/meta/MetaFateStoreFateIT.java | 14 +- .../fate/user/FateMutatorImplIT_SimpleSuite.java | 95 +++-- .../user/UserFateExecutionOrderIT_SimpleSuite.java | 9 +- .../test/fate/user/UserFateIT_SimpleSuite.java | 9 +- .../test/fate/user/UserFateOpsCommandsIT.java | 14 +- .../user/UserFatePoolsWatcherIT_SimpleSuite.java | 9 +- .../UserFateStatusEnforcementIT_SimpleSuite.java | 1 + .../fate/user/UserFateStoreFateIT_SimpleSuite.java | 9 +- .../test/functional/FateConcurrencyIT.java | 8 +- .../test/functional/FunctionalTestUtils.java | 4 +- 27 files changed, 500 insertions(+), 357 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index f19798ccbb..988d61a059 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -447,6 +447,10 @@ public enum Property { MANAGER_WAL_CLOSER_IMPLEMENTATION("manager.wal.closer.implementation", "org.apache.accumulo.server.manager.recovery.HadoopLogCloser", PropertyType.CLASSNAME, "A class that implements a mechanism to steal write access to a write-ahead log.", "2.1.0"), + MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX("manager.fate.conditional.writer.threads.max", "3", + PropertyType.COUNT, + "Maximum number of threads to use for writing data to tablet servers of the FATE system table.", + "4.0.0"), MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval", "60s", PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper to update interval.", "1.9.3"), diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 560c7da0b2..6a577333a1 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -532,6 +532,9 @@ public class Fate<T> { if (deadResCleanerExecutor != null) { deadResCleanerExecutor.shutdownNow(); } + + // ensure store resources are cleaned up + store.close(); } private boolean anyFateExecutorIsAlive() { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 3f5a8ec040..c73d5768bf 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -41,7 +41,7 @@ import org.apache.hadoop.io.DataInputBuffer; * transaction's operation, possibly pushing more operations onto the transaction as each step * successfully completes. If a step fails, the stack can be unwound, undoing each operation. */ -public interface FateStore<T> extends ReadOnlyFateStore<T> { +public interface FateStore<T> extends ReadOnlyFateStore<T>, AutoCloseable { /** * Create a new fate transaction id @@ -269,4 +269,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { */ FateTxStore<T> reserve(FateId fateId); + @Override + void close(); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index 264198cf93..4910e1e757 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -24,6 +24,7 @@ import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; import static org.apache.accumulo.core.fate.user.UserFateStore.invertRepo; import java.util.Objects; +import java.util.function.Supplier; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -58,14 +59,17 @@ public class FateMutatorImpl<T> implements FateMutator<T> { private final String tableName; private final FateId fateId; private final ConditionalMutation mutation; + private final Supplier<ConditionalWriter> writer; private boolean requiredUnreserved = false; public static final int INITIAL_ITERATOR_PRIO = 1000000; - public FateMutatorImpl(ClientContext context, String tableName, FateId fateId) { + public FateMutatorImpl(ClientContext context, String tableName, FateId fateId, + Supplier<ConditionalWriter> writer) { this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); - this.fateId = fateId; + this.fateId = Objects.requireNonNull(fateId); this.mutation = new ConditionalMutation(new Text(getRowId(fateId))); + this.writer = Objects.requireNonNull(writer); } @Override @@ -237,8 +241,8 @@ public class FateMutatorImpl<T> implements FateMutator<T> { return Status.ACCEPTED; } else { - try (ConditionalWriter writer = context.createConditionalWriter(tableName)) { - ConditionalWriter.Result result = writer.write(mutation); + try { + ConditionalWriter.Result result = writer.get().write(mutation); switch (result.getStatus()) { case ACCEPTED: diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index d3d117c9fa..f38c50a2e9 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -41,9 +41,11 @@ import java.util.stream.Stream; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriterConfig; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -73,6 +75,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; public class UserFateStore<T> extends AbstractFateStore<T> { @@ -80,6 +83,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { private final ClientContext context; private final String tableName; + private final Supplier<ConditionalWriter> writer; private static final FateInstanceType fateInstanceType = FateInstanceType.USER; private static final com.google.common.collect.Range<Integer> REPO_RANGE = @@ -108,6 +112,14 @@ public class UserFateStore<T> extends AbstractFateStore<T> { super(lockID, isLockHeld, maxDeferred, fateIdGenerator); this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); + this.writer = Suppliers.memoize(() -> { + try { + return createConditionalWriterForFateTable(this.tableName); + } catch (TableNotFoundException e) { + throw new IllegalStateException( + "Incorrect use of UserFateStore, table " + tableName + " does not exist."); + } + }); } @Override @@ -383,7 +395,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { } private FateMutatorImpl<T> newMutator(FateId fateId) { - return new FateMutatorImpl<>(context, tableName, fateId); + return new FateMutatorImpl<>(context, tableName, fateId, writer); } private <R> R scanTx(Function<Scanner,R> func) { @@ -491,15 +503,15 @@ public class UserFateStore<T> extends AbstractFateStore<T> { } final Map<FateId,ConditionalWriter.Status> resultsMap = new HashMap<>(); - try (ConditionalWriter writer = context.createConditionalWriter(tableName)) { - Iterator<ConditionalWriter.Result> results = writer + try { + Iterator<ConditionalWriter.Result> results = writer.get() .write(pending.values().stream().map(pair -> pair.getFirst().getMutation()).iterator()); while (results.hasNext()) { var result = results.next(); var row = new Text(result.getMutation().getRow()); resultsMap.put(FateId.from(FateInstanceType.USER, row.toString()), result.getStatus()); } - } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + } catch (AccumuloException | AccumuloSecurityException e) { throw new IllegalStateException(e); } return resultsMap; @@ -689,4 +701,17 @@ public class UserFateStore<T> extends AbstractFateStore<T> { "Position %s is not in the valid range of [0,%s]", position, MAX_REPOS); return position; } + + private ConditionalWriter createConditionalWriterForFateTable(String tableName) + throws TableNotFoundException { + int maxThreads = + context.getConfiguration().getCount(Property.MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX); + ConditionalWriterConfig cwConfig = new ConditionalWriterConfig().setMaxWriteThreads(maxThreads); + return context.createConditionalWriter(tableName, cwConfig); + } + + @Override + public void close() { + writer.get().close(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index 2d769bce6a..72ce79e1cc 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -653,6 +653,11 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { } } + @Override + public void close() { + // no-op + } + protected static class FateData<T> { final TStatus status; final Optional<FateKey> fateKey; diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index 29ba43fa0a..331401fc6b 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -193,6 +193,11 @@ public class FateLogger { public void deleteDeadReservations() { store.deleteDeadReservations(); } + + @Override + public void close() { + store.close(); + } }; } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 8e881fddad..e4d057fc10 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -291,4 +291,9 @@ public class TestStore implements FateStore<String> { public boolean isDeferredOverflow() { return false; } + + @Override + public void close() { + // no-op + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index d5bb3f4261..0d7d30d350 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -959,7 +959,6 @@ public class Admin implements KeywordExecutable { var zTableLocksPath = context.getServerPaths().createTableLocksPath(); var zk = context.getZooSession(); ServiceLock adminLock = null; - Map<FateInstanceType,FateStore<Admin>> fateStores; Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores = null; try { @@ -967,20 +966,22 @@ public class Admin implements KeywordExecutable { cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList); } else if (fateOpsCommand.fail) { adminLock = createAdminLock(context); - fateStores = createFateStores(context, zk, adminLock); - for (String fateIdStr : fateOpsCommand.fateIdList) { - if (!admin.prepFail(fateStores, fateIdStr)) { - throw new AccumuloException("Could not fail transaction: " + fateIdStr); + try (var fateStores = createFateStores(context, zk, adminLock)) { + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepFail(fateStores.getStoresMap(), fateIdStr)) { + throw new AccumuloException("Could not fail transaction: " + fateIdStr); + } } } } else if (fateOpsCommand.delete) { adminLock = createAdminLock(context); - fateStores = createFateStores(context, zk, adminLock); - for (String fateIdStr : fateOpsCommand.fateIdList) { - if (!admin.prepDelete(fateStores, fateIdStr)) { - throw new AccumuloException("Could not delete transaction: " + fateIdStr); + try (var fateStores = createFateStores(context, zk, adminLock)) { + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepDelete(fateStores.getStoresMap(), fateIdStr)) { + throw new AccumuloException("Could not delete transaction: " + fateIdStr); + } + admin.deleteLocks(zk, zTableLocksPath, fateIdStr); } - admin.deleteLocks(zk, zTableLocksPath, fateIdStr); } } @@ -991,7 +992,7 @@ public class Admin implements KeywordExecutable { getCmdLineStatusFilters(fateOpsCommand.states); EnumSet<FateInstanceType> typesFilter = getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes); - readOnlyFateStores = createReadOnlyFateStores(context, zk, Constants.ZFATE); + readOnlyFateStores = createReadOnlyFateStores(context, zk); admin.print(readOnlyFateStores, zk, zTableLocksPath, new Formatter(System.out), fateIdFilter, statusFilter, typesFilter); // print line break at the end @@ -1000,7 +1001,7 @@ public class Admin implements KeywordExecutable { if (fateOpsCommand.summarize) { if (readOnlyFateStores == null) { - readOnlyFateStores = createReadOnlyFateStores(context, zk, Constants.ZFATE); + readOnlyFateStores = createReadOnlyFateStores(context, zk); } summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores, zTableLocksPath); } @@ -1011,20 +1012,19 @@ public class Admin implements KeywordExecutable { } } - private Map<FateInstanceType,FateStore<Admin>> createFateStores(ServerContext context, - ZooSession zk, ServiceLock adminLock) throws InterruptedException, KeeperException { + private FateStores createFateStores(ServerContext context, ZooSession zk, ServiceLock adminLock) + throws InterruptedException, KeeperException { var lockId = adminLock.getLockID(); MetaFateStore<Admin> mfs = new MetaFateStore<>(zk, lockId, null); UserFateStore<Admin> ufs = new UserFateStore<>(context, SystemTables.FATE.tableName(), lockId, null); - return Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + return new FateStores(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); } - private Map<FateInstanceType,ReadOnlyFateStore<Admin>> - createReadOnlyFateStores(ServerContext context, ZooSession zk, String fateZkPath) - throws InterruptedException, KeeperException { - MetaFateStore<Admin> readOnlyMFS = new MetaFateStore<>(zk, null, null); - UserFateStore<Admin> readOnlyUFS = + private Map<FateInstanceType,ReadOnlyFateStore<Admin>> createReadOnlyFateStores( + ServerContext context, ZooSession zk) throws InterruptedException, KeeperException { + ReadOnlyFateStore<Admin> readOnlyMFS = new MetaFateStore<>(zk, null, null); + ReadOnlyFateStore<Admin> readOnlyUFS = new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null); return Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); } @@ -1343,4 +1343,27 @@ public class Admin implements KeywordExecutable { System.out.println("-".repeat(50)); System.out.println(); } + + /** + * Wrapper around the fate stores + */ + private static class FateStores implements AutoCloseable { + private final Map<FateInstanceType,FateStore<Admin>> storesMap; + + private FateStores(FateInstanceType type1, FateStore<Admin> store1, FateInstanceType type2, + FateStore<Admin> store2) { + storesMap = Map.of(type1, store1, type2, store2); + } + + private Map<FateInstanceType,FateStore<Admin>> getStoresMap() { + return storesMap; + } + + @Override + public void close() { + for (var fs : storesMap.values()) { + fs.close(); + } + } + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java index 47f9e90e4b..a771f595a9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java @@ -61,54 +61,55 @@ public class TableLocksCheckRunner implements CheckRunner { final AdminUtil<Admin> admin = new AdminUtil<>(); final var zTableLocksPath = context.getServerPaths().createTableLocksPath(); final var zk = context.getZooSession(); - final MetaFateStore<Admin> mfs = new MetaFateStore<>(zk, null, null); - final UserFateStore<Admin> ufs = - new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null); + try (final MetaFateStore<Admin> mfs = new MetaFateStore<>(zk, null, null); final UserFateStore< + Admin> ufs = new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null)) { - log.trace("Ensuring table and namespace locks are valid..."); + log.trace("Ensuring table and namespace locks are valid..."); - var tableIds = context.tableOperations().tableIdMap().values(); - var namespaceIds = context.namespaceOperations().namespaceIdMap().values(); - List<String> lockedIds = - context.getZooSession().asReader().getChildren(zTableLocksPath.toString()); - boolean locksExist = !lockedIds.isEmpty(); + var tableIds = context.tableOperations().tableIdMap().values(); + var namespaceIds = context.namespaceOperations().namespaceIdMap().values(); + List<String> lockedIds = + context.getZooSession().asReader().getChildren(zTableLocksPath.toString()); + boolean locksExist = !lockedIds.isEmpty(); - if (locksExist) { - lockedIds.removeAll(tableIds); - lockedIds.removeAll(namespaceIds); - if (!lockedIds.isEmpty()) { - status = Admin.CheckCommand.CheckStatus.FAILED; - log.warn("...Some table and namespace locks are INVALID (the table/namespace DNE): " - + lockedIds); + if (locksExist) { + lockedIds.removeAll(tableIds); + lockedIds.removeAll(namespaceIds); + if (!lockedIds.isEmpty()) { + status = Admin.CheckCommand.CheckStatus.FAILED; + log.warn("...Some table and namespace locks are INVALID (the table/namespace DNE): " + + lockedIds); + } else { + log.trace("...locks are valid"); + } } else { - log.trace("...locks are valid"); + log.trace("...no locks present"); } - } else { - log.trace("...no locks present"); - } - log.trace("Ensuring table and namespace locks are associated with a FATE op..."); + log.trace("Ensuring table and namespace locks are associated with a FATE op..."); - if (locksExist) { - final var fateStatus = - admin.getStatus(Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs), zk, - zTableLocksPath, null, null, null); - if (!fateStatus.getDanglingHeldLocks().isEmpty() - || !fateStatus.getDanglingWaitingLocks().isEmpty()) { - status = Admin.CheckCommand.CheckStatus.FAILED; - log.warn("The following locks did not have an associated FATE operation\n"); - for (Map.Entry<FateId,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet()) { - log.warn("fateId: " + entry.getKey() + " locked: " + entry.getValue()); - } - for (Map.Entry<FateId,List<String>> entry : fateStatus.getDanglingWaitingLocks() - .entrySet()) { - log.warn("fateId: " + entry.getKey() + " locking: " + entry.getValue()); + if (locksExist) { + final var fateStatus = + admin.getStatus(Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs), zk, + zTableLocksPath, null, null, null); + if (!fateStatus.getDanglingHeldLocks().isEmpty() + || !fateStatus.getDanglingWaitingLocks().isEmpty()) { + status = Admin.CheckCommand.CheckStatus.FAILED; + log.warn("The following locks did not have an associated FATE operation\n"); + for (Map.Entry<FateId,List<String>> entry : fateStatus.getDanglingHeldLocks() + .entrySet()) { + log.warn("fateId: " + entry.getKey() + " locked: " + entry.getValue()); + } + for (Map.Entry<FateId,List<String>> entry : fateStatus.getDanglingWaitingLocks() + .entrySet()) { + log.warn("fateId: " + entry.getKey() + " locking: " + entry.getValue()); + } + } else { + log.trace("...locks are valid"); } } else { - log.trace("...locks are valid"); + log.trace("...no locks present"); } - } else { - log.trace("...no locks present"); } return status; diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 6365df4a80..7c9a4c5d56 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -252,10 +252,10 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { @Test public void testCompactionCommitAndDeadDetectionRoot() throws Exception { var ctx = getCluster().getServerContext(); - FateStore<Manager> metaFateStore = - new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(), null); - try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build(); + FateStore<Manager> metaFateStore = + new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(), null)) { var tableId = ctx.getTableId(SystemTables.ROOT.tableName()); var allCids = new HashMap<TableId,List<ExternalCompactionId>>(); var fateId = createCompactionCommitAndDeadMetadata(c, metaFateStore, @@ -271,10 +271,10 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { @Test public void testCompactionCommitAndDeadDetectionMeta() throws Exception { var ctx = getCluster().getServerContext(); - FateStore<Manager> metaFateStore = - new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(), null); - try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build(); + FateStore<Manager> metaFateStore = + new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(), null)) { // Metadata table by default already has 2 tablets var tableId = ctx.getTableId(SystemTables.METADATA.tableName()); var allCids = new HashMap<TableId,List<ExternalCompactionId>>(); @@ -293,9 +293,9 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { var ctx = getCluster().getServerContext(); final String tableName = getUniqueNames(1)[0]; - try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore<Manager> userFateStore = - new UserFateStore<>(ctx, SystemTables.FATE.tableName(), testLock.getLockID(), null); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build(); + UserFateStore<Manager> userFateStore = + new UserFateStore<>(ctx, SystemTables.FATE.tableName(), testLock.getLockID(), null)) { SortedSet<Text> splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); @@ -317,11 +317,11 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { var ctx = getCluster().getServerContext(); final String userTable = getUniqueNames(1)[0]; - try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore<Manager> userFateStore = - new UserFateStore<>(ctx, SystemTables.FATE.tableName(), testLock.getLockID(), null); - FateStore<Manager> metaFateStore = - new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(), null); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build(); + FateStore<Manager> userFateStore = + new UserFateStore<>(ctx, SystemTables.FATE.tableName(), testLock.getLockID(), null); + FateStore<Manager> metaFateStore = + new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(), null)) { SortedSet<Text> splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java index 70fa454965..ae6d485d05 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java @@ -73,53 +73,55 @@ public abstract class MultipleStoresITBase extends SharedMiniClusterBase { final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); Map<FateId,FateStore.FateReservation> activeReservations; - final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, null); - final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, null); - final FateId fakeFateId = FateId.from(store1.type(), UUID.randomUUID()); + try (final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, null); + final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, null)) { + final FateId fakeFateId = FateId.from(store1.type(), UUID.randomUUID()); - // Create the fate ids using store1 - for (int i = 0; i < numFateIds; i++) { - assertTrue(allIds.add(store1.create())); - } - assertEquals(numFateIds, allIds.size()); - - // Reserve half the fate ids using store1 and rest using store2, after reserving a fate id in - // one, should not be able to reserve the same in the other. Should also not matter that all the - // ids were created using store1 - int count = 0; - for (FateId fateId : allIds) { - if (count % 2 == 0) { - reservations.add(store1.reserve(fateId)); - assertTrue(store2.tryReserve(fateId).isEmpty()); - } else { - reservations.add(store2.reserve(fateId)); - assertTrue(store1.tryReserve(fateId).isEmpty()); + // Create the fate ids using store1 + for (int i = 0; i < numFateIds; i++) { + assertTrue(allIds.add(store1.create())); } - count++; - } - // Try to reserve a non-existent fate id - assertTrue(store1.tryReserve(fakeFateId).isEmpty()); - assertTrue(store2.tryReserve(fakeFateId).isEmpty()); - // Both stores should return the same reserved transactions - activeReservations = store1.getActiveReservations(); - assertEquals(allIds, activeReservations.keySet()); - activeReservations = store2.getActiveReservations(); - assertEquals(allIds, activeReservations.keySet()); - - // Test setting/getting the TStatus and unreserving the transactions - for (int i = 0; i < allIds.size(); i++) { - var reservation = reservations.get(i); - assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus()); - reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED); - assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED, reservation.getStatus()); - reservation.delete(); - reservation.unreserve(Duration.ofMillis(0)); - // Attempt to set a status on a tx that has been unreserved (should throw exception) - assertThrows(IllegalStateException.class, - () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW)); + assertEquals(numFateIds, allIds.size()); + + // Reserve half the fate ids using store1 and rest using store2, after reserving a fate id in + // one, should not be able to reserve the same in the other. Should also not matter that all + // the + // ids were created using store1 + int count = 0; + for (FateId fateId : allIds) { + if (count % 2 == 0) { + reservations.add(store1.reserve(fateId)); + assertTrue(store2.tryReserve(fateId).isEmpty()); + } else { + reservations.add(store2.reserve(fateId)); + assertTrue(store1.tryReserve(fateId).isEmpty()); + } + count++; + } + // Try to reserve a non-existent fate id + assertTrue(store1.tryReserve(fakeFateId).isEmpty()); + assertTrue(store2.tryReserve(fakeFateId).isEmpty()); + // Both stores should return the same reserved transactions + activeReservations = store1.getActiveReservations(); + assertEquals(allIds, activeReservations.keySet()); + activeReservations = store2.getActiveReservations(); + assertEquals(allIds, activeReservations.keySet()); + + // Test setting/getting the TStatus and unreserving the transactions + for (int i = 0; i < allIds.size(); i++) { + var reservation = reservations.get(i); + assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus()); + reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED); + assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED, reservation.getStatus()); + reservation.delete(); + reservation.unreserve(Duration.ofMillis(0)); + // Attempt to set a status on a tx that has been unreserved (should throw exception) + assertThrows(IllegalStateException.class, + () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW)); + } + assertTrue(store1.getActiveReservations().isEmpty()); + assertTrue(store2.getActiveReservations().isEmpty()); } - assertTrue(store1.getActiveReservations().isEmpty()); - assertTrue(store2.getActiveReservations().isEmpty()); } @Test @@ -132,11 +134,12 @@ public abstract class MultipleStoresITBase extends SharedMiniClusterBase { // Tests that reserve() doesn't hang indefinitely and instead throws an error // on reserve() a non-existent transaction. final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); - final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null); - final FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID()); + try (final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null)) { + final FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID()); - var err = assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId)); - assertTrue(err.getMessage().contains(fakeFateId.canonical())); + var err = assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId)); + assertTrue(err.getMessage().contains(fakeFateId.canonical())); + } } @Test @@ -150,30 +153,32 @@ public abstract class MultipleStoresITBase extends SharedMiniClusterBase { final Set<FateId> allIds = new HashSet<>(); final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>(); final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); - final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null); - - // Create some FateIds and ensure that they can be reserved - for (int i = 0; i < numFateIds; i++) { - FateId fateId = store.create(); - assertTrue(allIds.add(fateId)); - var reservation = store.tryReserve(fateId); - assertFalse(reservation.isEmpty()); - reservations.add(reservation.orElseThrow()); - } - assertEquals(numFateIds, allIds.size()); + try (final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null)) { - // Try to reserve again, should not reserve - for (FateId fateId : allIds) { - assertTrue(store.tryReserve(fateId).isEmpty()); - } + // Create some FateIds and ensure that they can be reserved + for (int i = 0; i < numFateIds; i++) { + FateId fateId = store.create(); + assertTrue(allIds.add(fateId)); + var reservation = store.tryReserve(fateId); + assertFalse(reservation.isEmpty()); + reservations.add(reservation.orElseThrow()); + } + assertEquals(numFateIds, allIds.size()); - // Unreserve all the FateIds - for (var reservation : reservations) { - reservation.unreserve(Duration.ofMillis(0)); - } - // Try to unreserve again (should throw exception) - for (var reservation : reservations) { - assertThrows(IllegalStateException.class, () -> reservation.unreserve(Duration.ofMillis(0))); + // Try to reserve again, should not reserve + for (FateId fateId : allIds) { + assertTrue(store.tryReserve(fateId).isEmpty()); + } + + // Unreserve all the FateIds + for (var reservation : reservations) { + reservation.unreserve(Duration.ofMillis(0)); + } + // Try to unreserve again (should throw exception) + for (var reservation : reservations) { + assertThrows(IllegalStateException.class, + () -> reservation.unreserve(Duration.ofMillis(0))); + } } } @@ -188,39 +193,40 @@ public abstract class MultipleStoresITBase extends SharedMiniClusterBase { final Set<FateId> allIds = new HashSet<>(); final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>(); final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); - final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null); - - // Create some FateIds and ensure that they can be reserved - for (int i = 0; i < numFateIds; i++) { - FateId fateId = store.create(); - assertTrue(allIds.add(fateId)); - var reservation = store.tryReserve(fateId); - assertFalse(reservation.isEmpty()); - reservations.add(reservation.orElseThrow()); - } - assertEquals(numFateIds, allIds.size()); + try (final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null)) { - // Unreserve all - for (var reservation : reservations) { - reservation.unreserve(Duration.ofMillis(0)); - } + // Create some FateIds and ensure that they can be reserved + for (int i = 0; i < numFateIds; i++) { + FateId fateId = store.create(); + assertTrue(allIds.add(fateId)); + var reservation = store.tryReserve(fateId); + assertFalse(reservation.isEmpty()); + reservations.add(reservation.orElseThrow()); + } + assertEquals(numFateIds, allIds.size()); - // Ensure they can be reserved again, and delete and unreserve this time - for (FateId fateId : allIds) { - // Verify that the tx status is still NEW after unreserving since it hasn't been deleted - assertEquals(ReadOnlyFateStore.TStatus.NEW, store.read(fateId).getStatus()); - var reservation = store.tryReserve(fateId); - assertFalse(reservation.isEmpty()); - reservation.orElseThrow().delete(); - reservation.orElseThrow().unreserve(Duration.ofMillis(0)); - } + // Unreserve all + for (var reservation : reservations) { + reservation.unreserve(Duration.ofMillis(0)); + } + + // Ensure they can be reserved again, and delete and unreserve this time + for (FateId fateId : allIds) { + // Verify that the tx status is still NEW after unreserving since it hasn't been deleted + assertEquals(ReadOnlyFateStore.TStatus.NEW, store.read(fateId).getStatus()); + var reservation = store.tryReserve(fateId); + assertFalse(reservation.isEmpty()); + reservation.orElseThrow().delete(); + reservation.orElseThrow().unreserve(Duration.ofMillis(0)); + } - for (FateId fateId : allIds) { - // Verify that the tx is now unknown since it has been deleted - assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, store.read(fateId).getStatus()); - // Attempt to reserve a deleted txn, should throw an exception and not wait indefinitely - var err = assertThrows(IllegalStateException.class, () -> store.reserve(fateId)); - assertTrue(err.getMessage().contains(fateId.canonical())); + for (FateId fateId : allIds) { + // Verify that the tx is now unknown since it has been deleted + assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, store.read(fateId).getStatus()); + // Attempt to reserve a deleted txn, should throw an exception and not wait indefinitely + var err = assertThrows(IllegalStateException.class, () -> store.reserve(fateId)); + assertTrue(err.getMessage().contains(fateId.canonical())); + } } } @@ -239,42 +245,43 @@ public abstract class MultipleStoresITBase extends SharedMiniClusterBase { final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); final Set<ZooUtil.LockID> liveLocks = new HashSet<>(); final Predicate<ZooUtil.LockID> isLockHeld = liveLocks::contains; - final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, isLockHeld); - final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, isLockHeld); + try (final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, isLockHeld); + final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, isLockHeld)) { - liveLocks.add(lock1); - liveLocks.add(lock2); - - Fate<SleepingTestEnv> fate1 = new Fate<>(testEnv1, store1, true, Object::toString, - DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); - Fate<SleepingTestEnv> fate2 = new Fate<>(testEnv2, store2, false, Object::toString, - DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); + liveLocks.add(lock1); + liveLocks.add(lock2); - try { - for (int i = 0; i < numFateIds; i++) { - FateId fateId; - // Start half the txns using fate1, and the other half using fate2 - if (i % 2 == 0) { - fateId = fate1.startTransaction(); - fate1.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), true, "test"); - } else { - fateId = fate2.startTransaction(); - fate2.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), true, "test"); + Fate<SleepingTestEnv> fate1 = new Fate<>(testEnv1, store1, true, Object::toString, + DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); + Fate<SleepingTestEnv> fate2 = new Fate<>(testEnv2, store2, false, Object::toString, + DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); + + try { + for (int i = 0; i < numFateIds; i++) { + FateId fateId; + // Start half the txns using fate1, and the other half using fate2 + if (i % 2 == 0) { + fateId = fate1.startTransaction(); + fate1.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), true, "test"); + } else { + fateId = fate2.startTransaction(); + fate2.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), true, "test"); + } + allIds.add(fateId); } - allIds.add(fateId); - } - assertEquals(numFateIds, allIds.size()); + assertEquals(numFateIds, allIds.size()); - // Should be able to wait for completion on any fate instance - for (FateId fateId : allIds) { - fate2.waitForCompletion(fateId); + // Should be able to wait for completion on any fate instance + for (FateId fateId : allIds) { + fate2.waitForCompletion(fateId); + } + // Ensure that all txns have been executed and have only been executed once + assertTrue(Collections.disjoint(testEnv1.executedOps, testEnv2.executedOps)); + assertEquals(allIds, Sets.union(testEnv1.executedOps, testEnv2.executedOps)); + } finally { + fate1.shutdown(1, TimeUnit.MINUTES); + fate2.shutdown(1, TimeUnit.MINUTES); } - // Ensure that all txns have been executed and have only been executed once - assertTrue(Collections.disjoint(testEnv1.executedOps, testEnv2.executedOps)); - assertEquals(allIds, Sets.union(testEnv1.executedOps, testEnv2.executedOps)); - } finally { - fate1.shutdown(1, TimeUnit.MINUTES); - fate2.shutdown(1, TimeUnit.MINUTES); } } @@ -304,73 +311,79 @@ public abstract class MultipleStoresITBase extends SharedMiniClusterBase { final AccumuloConfiguration config = FateTestUtil.createTestFateConfig(numThreads); Map<FateId,FateStore.FateReservation> reservations; - final FateStore<LatchTestEnv> store1 = testStoreFactory.create(lock1, isLockHeld); - liveLocks.add(lock1); - Fate<LatchTestEnv> fate1 = null; - Fate<LatchTestEnv> fate2 = null; - - try { - fate1 = new FastFate<>(testEnv1, store1, true, Object::toString, config); - // Ensure nothing is reserved yet - assertTrue(store1.getActiveReservations().isEmpty()); - - // Create transactions - for (int i = 0; i < numFateIds; i++) { - FateId fateId; - fateId = fate1.startTransaction(); - fate1.seedTransaction(TEST_FATE_OP, fateId, new LatchTestRepo(), true, "test"); - allIds.add(fateId); - } - assertEquals(numFateIds, allIds.size()); - - // Wait for all the fate worker threads to start working on the transactions - Wait.waitFor(() -> testEnv1.numWorkers.get() == numFateIds); - // Each fate worker will be hung up working (IN_PROGRESS) on a single transaction + try (final FateStore<LatchTestEnv> store1 = testStoreFactory.create(lock1, isLockHeld)) { + liveLocks.add(lock1); + Fate<LatchTestEnv> fate1 = null; + Fate<LatchTestEnv> fate2 = null; - // Verify store1 has the transactions reserved and that they were reserved with lock1 - reservations = store1.getActiveReservations(); - assertEquals(allIds, reservations.keySet()); - reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); - - final FateStore<LatchTestEnv> store2 = testStoreFactory.create(lock2, isLockHeld); - - // Verify store2 can see the reserved transactions even though they were reserved using - // store1 - reservations = store2.getActiveReservations(); - assertEquals(allIds, reservations.keySet()); - reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); - - // Simulate what would happen if the Manager using the Fate object (fate1) died. - // isLockHeld would return false for the LockId of the Manager that died (in this case, lock1) - // and true for the new Manager's lock (lock2) - liveLocks.remove(lock1); - liveLocks.add(lock2); + try { + fate1 = new FastFate<>(testEnv1, store1, true, Object::toString, config); + // Ensure nothing is reserved yet + assertTrue(store1.getActiveReservations().isEmpty()); - // Create the new Fate/start the Fate threads (the work finder and the workers). - // Don't run another dead reservation cleaner since we already have one running from fate1. - fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config, - new ScheduledThreadPoolExecutor(2)); - - // Wait for the "dead" reservations to be deleted and picked up again (reserved using - // fate2/store2/lock2 now). - // They are considered "dead" if they are held by lock1 in this test. We don't have to worry - // about fate1/store1/lock1 being used to reserve the transactions again since all - // the workers for fate1 are hung up - Wait.waitFor(() -> { - Map<FateId,FateStore.FateReservation> store2Reservations = store2.getActiveReservations(); - boolean allReservedWithLock2 = - store2Reservations.values().stream().allMatch(entry -> entry.getLockID().equals(lock2)); - return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; - }, fate1.getDeadResCleanupDelay().toMillis() * 2); - } finally { - // Finish work and shutdown - testEnv1.workersLatch.countDown(); - testEnv2.workersLatch.countDown(); - if (fate1 != null) { - fate1.shutdown(1, TimeUnit.MINUTES); - } - if (fate2 != null) { - fate2.shutdown(1, TimeUnit.MINUTES); + // Create transactions + for (int i = 0; i < numFateIds; i++) { + FateId fateId; + fateId = fate1.startTransaction(); + fate1.seedTransaction(TEST_FATE_OP, fateId, new LatchTestRepo(), true, "test"); + allIds.add(fateId); + } + assertEquals(numFateIds, allIds.size()); + + // Wait for all the fate worker threads to start working on the transactions + Wait.waitFor(() -> testEnv1.numWorkers.get() == numFateIds); + // Each fate worker will be hung up working (IN_PROGRESS) on a single transaction + + // Verify store1 has the transactions reserved and that they were reserved with lock1 + reservations = store1.getActiveReservations(); + assertEquals(allIds, reservations.keySet()); + reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); + + try (final FateStore<LatchTestEnv> store2 = testStoreFactory.create(lock2, isLockHeld)) { + + // Verify store2 can see the reserved transactions even though they were reserved using + // store1 + reservations = store2.getActiveReservations(); + assertEquals(allIds, reservations.keySet()); + reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); + + // Simulate what would happen if the Manager using the Fate object (fate1) died. + // isLockHeld would return false for the LockId of the Manager that died (in this case, + // lock1) + // and true for the new Manager's lock (lock2) + liveLocks.remove(lock1); + liveLocks.add(lock2); + + // Create the new Fate/start the Fate threads (the work finder and the workers). + // Don't run another dead reservation cleaner since we already have one running from + // fate1. + fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config, + new ScheduledThreadPoolExecutor(2)); + + // Wait for the "dead" reservations to be deleted and picked up again (reserved using + // fate2/store2/lock2 now). + // They are considered "dead" if they are held by lock1 in this test. We don't have to + // worry + // about fate1/store1/lock1 being used to reserve the transactions again since all + // the workers for fate1 are hung up + Wait.waitFor(() -> { + Map<FateId,FateStore.FateReservation> store2Reservations = + store2.getActiveReservations(); + boolean allReservedWithLock2 = store2Reservations.values().stream() + .allMatch(entry -> entry.getLockID().equals(lock2)); + return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; + }, fate1.getDeadResCleanupDelay().toMillis() * 2); + } + } finally { + // Finish work and shutdown + testEnv1.workersLatch.countDown(); + testEnv2.workersLatch.countDown(); + if (fate1 != null) { + fate1.shutdown(1, TimeUnit.MINUTES); + } + if (fate2 != null) { + fate2.shutdown(1, TimeUnit.MINUTES); + } } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT_SimpleSuite.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT_SimpleSuite.java index bddab38cc6..b890f09f1a 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT_SimpleSuite.java @@ -25,6 +25,7 @@ import java.util.UUID; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.zookeeper.ZooSession; @@ -44,11 +45,13 @@ public class MetaFateExecutionOrderIT_SimpleSuite extends FateExecutionOrderITBa try (var zk = new ZooSession(getClass().getSimpleName() + ".mkdirs", conf)) { zk.asReaderWriter().mkdirs(ZK_ROOT); } - try (var zk = new ZooSession(getClass().getSimpleName() + ".fakeroot", - conf.get(Property.INSTANCE_ZK_HOST) + ZK_ROOT, - (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), - conf.get(Property.INSTANCE_SECRET))) { - testMethod.execute(new MetaFateStore<>(zk, createDummyLockID(), null), sctx); + try ( + var zk = new ZooSession(getClass().getSimpleName() + ".fakeroot", + conf.get(Property.INSTANCE_ZK_HOST) + ZK_ROOT, + (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), + conf.get(Property.INSTANCE_SECRET)); + FateStore<FeoTestEnv> fs = new MetaFateStore<>(zk, createDummyLockID(), null)) { + testMethod.execute(fs, sctx); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index dd7082709c..cfd28ecda1 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@ -30,6 +30,7 @@ import java.io.UncheckedIOException; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.zookeeper.ZooSession; @@ -64,8 +65,10 @@ public class MetaFateIT extends FateITBase { expect(sctx.getZooSession()).andReturn(zk).anyTimes(); replay(sctx); - testMethod.execute( - new MetaFateStore<>(zk, createDummyLockID(), null, maxDeferred, fateIdGenerator), sctx); + try (FateStore<TestEnv> fs = + new MetaFateStore<>(zk, createDummyLockID(), null, maxDeferred, fateIdGenerator)) { + testMethod.execute(fs, sctx); + } } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java index 2f53ee7697..4f81d2857e 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java @@ -42,7 +42,9 @@ public class MetaFateOpsCommandsIT extends FateOpsCommandsITBase { ServerContext context = getCluster().getServerContext(); var zk = context.getZooSession(); // test should not be reserving txns or checking reservations, so null lockID and isLockHeld - testMethod.execute(new MetaFateStore<>(zk, null, null), context); + try (FateStore<LatchTestEnv> fs = new MetaFateStore<>(zk, null, null)) { + testMethod.execute(fs, context); + } } /** @@ -62,7 +64,9 @@ public class MetaFateOpsCommandsIT extends FateOpsCommandsITBase { ZooUtil.LockID lockID = testLock.getLockID(); Predicate<ZooUtil.LockID> isLockHeld = lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); - testMethod.execute(new MetaFateStore<>(zk, lockID, isLockHeld), context); + try (FateStore<LatchTestEnv> fs = new MetaFateStore<>(zk, lockID, isLockHeld)) { + testMethod.execute(fs, context); + } } finally { if (testLock != null) { testLock.unlock(); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolsWatcherIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolsWatcherIT.java index 1fc61c1c00..5c5b26d2e0 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolsWatcherIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolsWatcherIT.java @@ -26,6 +26,7 @@ import static org.easymock.EasyMock.replay; import java.io.File; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FatePoolsWatcherITBase; @@ -56,7 +57,9 @@ public class MetaFatePoolsWatcherIT extends FatePoolsWatcherITBase { expect(sctx.getZooSession()).andReturn(zk).anyTimes(); replay(sctx); - testMethod.execute( - new MetaFateStore<>(zk, createDummyLockID(), null, maxDeferred, fateIdGenerator), sctx); + try (FateStore<PoolResizeTestEnv> fs = + new MetaFateStore<>(zk, createDummyLockID(), null, maxDeferred, fateIdGenerator)) { + testMethod.execute(fs, sctx); + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java index 9f7ad526ae..4735e9ee6e 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java @@ -26,6 +26,7 @@ import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.test.fate.FateStatusEnforcementITBase; import org.apache.accumulo.test.fate.FateTestUtil; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; @@ -50,4 +51,9 @@ public class MetaFateStatusEnforcementIT extends FateStatusEnforcementITBase { fateId = store.create(); txStore = store.reserve(fateId); } + + @AfterEach + public void afterEachTeardown() { + store.close(); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index 755ba42d7d..f41f993d99 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@ -73,13 +73,13 @@ public class MetaFateStoreFateIT extends FateStoreITBase { ServerContext sctx = createMock(ServerContext.class); expect(sctx.getZooSession()).andReturn(FateTestUtil.MetaFateZKSetup.getZk()).anyTimes(); replay(sctx); - MetaFateStore<TestEnv> store = new MetaFateStore<>(FateTestUtil.MetaFateZKSetup.getZk(), - createDummyLockID(), null, maxDeferred, fateIdGenerator); - - // Check that the store has no transactions before and after each test - assertEquals(0, store.list().count()); - testMethod.execute(store, sctx); - assertEquals(0, store.list().count()); + try (FateStore<TestEnv> store = new MetaFateStore<>(FateTestUtil.MetaFateZKSetup.getZk(), + createDummyLockID(), null, maxDeferred, fateIdGenerator)) { + // Check that the store has no transactions before and after each test + assertEquals(0, store.list().count()); + testMethod.execute(store, sctx); + assertEquals(0, store.list().count()); + } } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT_SimpleSuite.java b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT_SimpleSuite.java index 0260827dbf..acfe46589d 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT_SimpleSuite.java @@ -25,9 +25,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import java.time.Duration; import java.util.UUID; +import java.util.function.Supplier; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -46,6 +49,8 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Suppliers; + public class FateMutatorImplIT_SimpleSuite extends SharedMiniClusterBase { Logger log = LoggerFactory.getLogger(FateMutatorImplIT_SimpleSuite.class); @@ -67,6 +72,16 @@ public class FateMutatorImplIT_SimpleSuite extends SharedMiniClusterBase { return Duration.ofMinutes(5); } + private Supplier<ConditionalWriter> createWriterSupplier(AccumuloClient client, String table) { + return Suppliers.memoize(() -> { + try { + return client.createConditionalWriter(table); + } catch (TableNotFoundException e) { + throw new IllegalStateException(); + } + }); + } + @Test public void putRepo() throws Exception { final String table = getUniqueNames(1)[0]; @@ -77,20 +92,22 @@ public class FateMutatorImplIT_SimpleSuite extends SharedMiniClusterBase { var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + Supplier<ConditionalWriter> writer = createWriterSupplier(client, table); + // add some repos in order - var fateMutator = new FateMutatorImpl<TestEnv>(context, table, fateId); + var fateMutator = new FateMutatorImpl<TestEnv>(context, table, fateId, writer); fateMutator.putRepo(100, new TestRepo("test")).mutate(); - var fateMutator1 = new FateMutatorImpl<TestEnv>(context, table, fateId); + var fateMutator1 = new FateMutatorImpl<TestEnv>(context, table, fateId, writer); fateMutator1.putRepo(99, new TestRepo("test")).mutate(); - var fateMutator2 = new FateMutatorImpl<TestEnv>(context, table, fateId); + var fateMutator2 = new FateMutatorImpl<TestEnv>(context, table, fateId, writer); fateMutator2.putRepo(98, new TestRepo("test")).mutate(); // make sure we cant add a repo that has already been added - var fateMutator3 = new FateMutatorImpl<TestEnv>(context, table, fateId); + var fateMutator3 = new FateMutatorImpl<TestEnv>(context, table, fateId, writer); assertThrows(IllegalStateException.class, () -> fateMutator3.putRepo(98, new TestRepo("test")).mutate(), "Repo in position 98 already exists. Expected to not be able to add it again."); - var fateMutator4 = new FateMutatorImpl<TestEnv>(context, table, fateId); + var fateMutator4 = new FateMutatorImpl<TestEnv>(context, table, fateId, writer); assertThrows(IllegalStateException.class, () -> fateMutator4.putRepo(99, new TestRepo("test")).mutate(), "Repo in position 99 already exists. Expected to not be able to add it again."); @@ -107,67 +124,71 @@ public class FateMutatorImplIT_SimpleSuite extends SharedMiniClusterBase { var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); - // use require status passing all statuses. without the status column present this should fail + Supplier<ConditionalWriter> writer = createWriterSupplier(client, table); + + // use require status passing all statuses. without the status column present this should + // fail assertThrows(IllegalStateException.class, - () -> new FateMutatorImpl<>(context, table, fateId) + () -> new FateMutatorImpl<>(context, table, fateId, writer) .requireStatus(ReadOnlyFateStore.TStatus.values()) .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate()); assertEquals(0, client.createScanner(table).stream().count()); - var status = new FateMutatorImpl<>(context, table, fateId) + var status = new FateMutatorImpl<>(context, table, fateId, writer) .requireStatus(ReadOnlyFateStore.TStatus.values()) .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate(); assertEquals(REJECTED, status); assertEquals(0, client.createScanner(table).stream().count()); - // use require status without passing any statuses to require that the status column is absent - status = new FateMutatorImpl<>(context, table, fateId).requireStatus() + // use require status without passing any statuses to require that the status column is + // absent + status = new FateMutatorImpl<>(context, table, fateId, writer).requireStatus() .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate(); assertEquals(ACCEPTED, status); - // try again with requiring an absent status column. this time it should fail because we just + // try again with requiring an absent status column. this time it should fail because we + // just // put status NEW assertThrows(IllegalStateException.class, - () -> new FateMutatorImpl<>(context, table, fateId).requireStatus() + () -> new FateMutatorImpl<>(context, table, fateId, writer).requireStatus() .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate(), "Expected to not be able to use requireStatus() without passing any statuses"); - status = new FateMutatorImpl<>(context, table, fateId).requireStatus() + status = new FateMutatorImpl<>(context, table, fateId, writer).requireStatus() .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate(); assertEquals(REJECTED, status, "Expected to not be able to use requireStatus() without passing any statuses"); // now use require same with the current status, NEW passed in - status = - new FateMutatorImpl<>(context, table, fateId).requireStatus(ReadOnlyFateStore.TStatus.NEW) - .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate(); + status = new FateMutatorImpl<>(context, table, fateId, writer) + .requireStatus(ReadOnlyFateStore.TStatus.NEW) + .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate(); assertEquals(ACCEPTED, status); // use require same with an array of statuses, none of which are the current status // (SUBMITTED) assertThrows(IllegalStateException.class, - () -> new FateMutatorImpl<>(context, table, fateId) + () -> new FateMutatorImpl<>(context, table, fateId, writer) .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN) .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).mutate(), "Expected to not be able to use requireStatus() with statuses that do not match the current status"); - status = new FateMutatorImpl<>(context, table, fateId) + status = new FateMutatorImpl<>(context, table, fateId, writer) .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN) .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate(); assertEquals(REJECTED, status, "Expected to not be able to use requireStatus() with statuses that do not match the current status"); - // use require same with an array of statuses, one of which is the current status (SUBMITTED) - status = new FateMutatorImpl<>(context, table, fateId) + // use require same with an array of statuses, one of which is the current status + // (SUBMITTED) + status = new FateMutatorImpl<>(context, table, fateId, writer) .requireStatus(ReadOnlyFateStore.TStatus.UNKNOWN, ReadOnlyFateStore.TStatus.SUBMITTED) .putStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS).tryMutate(); assertEquals(ACCEPTED, status); // one more time check that we can use require same with the current status (IN_PROGRESS) - status = new FateMutatorImpl<>(context, table, fateId) + status = new FateMutatorImpl<>(context, table, fateId, writer) .requireStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS) .putStatus(ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS).tryMutate(); assertEquals(ACCEPTED, status); - } - } @Test @@ -183,29 +204,33 @@ public class FateMutatorImplIT_SimpleSuite extends SharedMiniClusterBase { var reservation = FateReservation.from(lockID, UUID.randomUUID()); var wrongReservation = FateReservation.from(lockID, UUID.randomUUID()); + Supplier<ConditionalWriter> writer = createWriterSupplier(client, table); + // Ensure that reserving is the only thing we can do - var status = - new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + var status = new FateMutatorImpl<>(context, table, fateId, writer).putUnreserveTx(reservation) + .tryMutate(); assertEquals(REJECTED, status); - status = new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); + status = new FateMutatorImpl<>(context, table, fateId, writer).putReservedTx(reservation) + .tryMutate(); assertEquals(ACCEPTED, status); // Should not be able to reserve when it is already reserved - status = - new FateMutatorImpl<>(context, table, fateId).putReservedTx(wrongReservation).tryMutate(); + status = new FateMutatorImpl<>(context, table, fateId, writer).putReservedTx(wrongReservation) + .tryMutate(); assertEquals(REJECTED, status); - status = new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); + status = new FateMutatorImpl<>(context, table, fateId, writer).putReservedTx(reservation) + .tryMutate(); assertEquals(REJECTED, status); // Should be able to unreserve - status = new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(wrongReservation) - .tryMutate(); + status = new FateMutatorImpl<>(context, table, fateId, writer) + .putUnreserveTx(wrongReservation).tryMutate(); assertEquals(REJECTED, status); - status = - new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + status = new FateMutatorImpl<>(context, table, fateId, writer).putUnreserveTx(reservation) + .tryMutate(); assertEquals(ACCEPTED, status); - status = - new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + status = new FateMutatorImpl<>(context, table, fateId, writer).putUnreserveTx(reservation) + .tryMutate(); assertEquals(REJECTED, status); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT_SimpleSuite.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT_SimpleSuite.java index 9e9387d6a9..264096e06d 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT_SimpleSuite.java @@ -24,6 +24,7 @@ import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.test.fate.FateExecutionOrderITBase; @@ -32,11 +33,11 @@ public class UserFateExecutionOrderIT_SimpleSuite extends FateExecutionOrderITBa public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { var table = getUniqueNames(1)[0]; - try (ClientContext client = - (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); + FateStore<FeoTestEnv> fs = new UserFateStore<>(client, table, createDummyLockID(), null, + maxDeferred, fateIdGenerator)) { createFateTable(client, table); - testMethod.execute(new UserFateStore<>(client, table, createDummyLockID(), null, maxDeferred, - fateIdGenerator), getCluster().getServerContext()); + testMethod.execute(fs, getCluster().getServerContext()); client.tableOperations().delete(table); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT_SimpleSuite.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT_SimpleSuite.java index cfed04bea9..0719768683 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT_SimpleSuite.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.core.fate.user.schema.FateSchema; @@ -65,11 +66,11 @@ public class UserFateIT_SimpleSuite extends FateITBase { public void executeTest(FateTestExecutor<TestEnv> testMethod, int maxDeferred, FateIdGenerator fateIdGenerator) throws Exception { table = getUniqueNames(1)[0]; - try (ClientContext client = - (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); + FateStore<TestEnv> fs = new UserFateStore<>(client, table, createDummyLockID(), null, + maxDeferred, fateIdGenerator)) { createFateTable(client, table); - testMethod.execute(new UserFateStore<>(client, table, createDummyLockID(), null, maxDeferred, - fateIdGenerator), getCluster().getServerContext()); + testMethod.execute(fs, getCluster().getServerContext()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java index fca55dc04c..7d84a72f8a 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java @@ -40,9 +40,11 @@ public class UserFateOpsCommandsIT extends FateOpsCommandsITBase { public void executeTest(FateTestExecutor<LatchTestEnv> testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { var context = getCluster().getServerContext(); - // the test should not be reserving or checking reservations, so null lockID and isLockHeld - testMethod.execute(new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null), - context); + try (FateStore<LatchTestEnv> fs = + new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null)) { + // the test should not be reserving or checking reservations, so null lockID and isLockHeld + testMethod.execute(fs, context); + } } /** @@ -61,8 +63,10 @@ public class UserFateOpsCommandsIT extends FateOpsCommandsITBase { ZooUtil.LockID lockID = testLock.getLockID(); Predicate<ZooUtil.LockID> isLockHeld = lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); - testMethod.execute( - new UserFateStore<>(context, SystemTables.FATE.tableName(), lockID, isLockHeld), context); + try (FateStore<LatchTestEnv> fs = + new UserFateStore<>(context, SystemTables.FATE.tableName(), lockID, isLockHeld)) { + testMethod.execute(fs, context); + } } finally { if (testLock != null) { testLock.unlock(); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolsWatcherIT_SimpleSuite.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolsWatcherIT_SimpleSuite.java index 5c3030a6df..5c4d05d516 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolsWatcherIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolsWatcherIT_SimpleSuite.java @@ -24,6 +24,7 @@ import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.test.fate.FatePoolsWatcherITBase; @@ -48,11 +49,11 @@ public class UserFatePoolsWatcherIT_SimpleSuite extends FatePoolsWatcherITBase { public void executeTest(FateTestExecutor<PoolResizeTestEnv> testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { table = getUniqueNames(1)[0]; - try (ClientContext client = - (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); + FateStore<PoolResizeTestEnv> fs = new UserFateStore<>(client, table, createDummyLockID(), + null, maxDeferred, fateIdGenerator)) { createFateTable(client, table); - testMethod.execute(new UserFateStore<>(client, table, createDummyLockID(), null, maxDeferred, - fateIdGenerator), getCluster().getServerContext()); + testMethod.execute(fs, getCluster().getServerContext()); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT_SimpleSuite.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT_SimpleSuite.java index 48dc90ffe1..971d336cfe 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT_SimpleSuite.java @@ -57,6 +57,7 @@ public class UserFateStatusEnforcementIT_SimpleSuite extends FateStatusEnforceme @AfterEach public void afterEachTeardown() { + store.close(); client.close(); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT_SimpleSuite.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT_SimpleSuite.java index c1f902a5a7..5072f0176b 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT_SimpleSuite.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT_SimpleSuite.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -54,11 +55,11 @@ public class UserFateStoreFateIT_SimpleSuite extends FateStoreITBase { public void executeTest(FateTestExecutor<TestEnv> testMethod, int maxDeferred, FateIdGenerator fateIdGenerator) throws Exception { String table = getUniqueNames(1)[0] + "fatestore"; - try (ClientContext client = - (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); + FateStore<TestEnv> fs = new UserFateStore<>(client, table, createDummyLockID(), null, + maxDeferred, fateIdGenerator)) { createFateTable(client, table); - testMethod.execute(new UserFateStore<>(client, table, createDummyLockID(), null, maxDeferred, - fateIdGenerator), getCluster().getServerContext()); + testMethod.execute(fs, getCluster().getServerContext()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 66355ccd72..5b7993403b 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -256,8 +256,8 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { try { var zk = context.getZooSession(); - MetaFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null, null); - UserFateStore<String> readOnlyUFS = + ReadOnlyFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null, null); + ReadOnlyFateStore<String> readOnlyUFS = new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null); var lockPath = context.getServerPaths().createTableLocksPath(tableId); Map<FateInstanceType,ReadOnlyFateStore<String>> readOnlyFateStores = @@ -348,7 +348,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { log.trace("tid: {}", tableId); var zk = context.getZooSession(); - MetaFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null, null); + ReadOnlyFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null, null); var lockPath = context.getServerPaths().createTableLocksPath(tableId); AdminUtil.FateStatus fateStatus = admin.getStatus(readOnlyMFS, zk, lockPath, null, null, null); @@ -378,7 +378,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { log.trace("tid: {}", tableId); - UserFateStore<String> readOnlyUFS = + ReadOnlyFateStore<String> readOnlyUFS = new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null); AdminUtil.FateStatus fateStatus = admin.getStatus(readOnlyUFS, null, null, null); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 2618b6c2f8..9f7e2df223 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -231,9 +231,9 @@ public class FunctionalTestUtils { AdminUtil<String> admin = new AdminUtil<>(); ServerContext context = cluster.getServerContext(); var zk = context.getZooSession(); - MetaFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null, null); - UserFateStore<String> readOnlyUFS = + ReadOnlyFateStore<String> readOnlyUFS = new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null); + ReadOnlyFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null, null); Map<FateInstanceType,ReadOnlyFateStore<String>> readOnlyFateStores = Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); var lockPath = context.getServerPaths().createTableLocksPath();