This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit dc3a448af342412edff6aba59460998888f2e782 Merge: d64dda4c66 9c436cb466 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Apr 14 18:14:57 2025 +0000 Merge branch '2.1' .../accumulo/core/clientImpl/ClientContext.java | 1 - .../java/org/apache/accumulo/core/fate/Fate.java | 38 +++++++--------------- .../org/apache/accumulo/compactor/Compactor.java | 4 +-- .../java/org/apache/accumulo/manager/Manager.java | 4 +-- .../org/apache/accumulo/test/fate/FastFate.java | 4 ++- .../accumulo/test/fate/FateExecutionOrderIT.java | 3 +- .../java/org/apache/accumulo/test/fate/FateIT.java | 3 +- .../accumulo/test/fate/FateOpsCommandsIT.java | 3 +- .../org/apache/accumulo/test/fate/FlakyFate.java | 3 +- .../accumulo/test/fate/MultipleStoresIT.java | 12 ++++--- 10 files changed, 33 insertions(+), 42 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 6a5df1550c,c9e714f74c..da9becc64e --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@@ -1090,64 -1132,4 +1090,63 @@@ public class ClientContext implements A } return this.zkLockChecker; } + + public ServiceLockPaths getServerPaths() { + return this.serverPaths.get(); + } + + public NamespaceMapping getNamespaces() { + ensureOpen(); + return namespaces; + } + + public ClientTabletCache getTabletLocationCache(TableId tableId) { + ensureOpen(); + return tabletLocationCache.get(DataLevel.of(tableId)).computeIfAbsent(tableId, + (TableId key) -> { + var lockChecker = getTServerLockChecker(); + if (AccumuloTable.ROOT.tableId().equals(tableId)) { + return new RootClientTabletCache(lockChecker); + } + var mlo = new MetadataCachedTabletObtainer(); + if (AccumuloTable.METADATA.tableId().equals(tableId)) { + return new ClientTabletCacheImpl(AccumuloTable.METADATA.tableId(), + getTabletLocationCache(AccumuloTable.ROOT.tableId()), mlo, lockChecker); + } else { + return new ClientTabletCacheImpl(tableId, + getTabletLocationCache(AccumuloTable.METADATA.tableId()), mlo, lockChecker); + } + }); + } + + /** + * Clear the currently cached tablet locations. The use of ConcurrentHashMap ensures this is + * thread-safe. However, since the ConcurrentHashMap iterator is weakly consistent, it does not + * block new locations from being cached. If new locations are added while this is executing, they + * may be immediately invalidated by this code. Multiple calls to this method in different threads + * may cause some location caches to be invalidated multiple times. That is okay, because cache + * invalidation is idempotent. + */ + public void clearTabletLocationCache() { + tabletLocationCache.forEach((dataLevel, map) -> { + // use iter.remove() instead of calling clear() on the map, to prevent clearing entries that + // may not have been invalidated + var iter = map.values().iterator(); + while (iter.hasNext()) { + iter.next().invalidate(); + iter.remove(); + } + }); + } + + private static Set<String> createPersistentWatcherPaths() { + Set<String> pathsToWatch = new HashSet<>(); + for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, + Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK, + Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES, + Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK)) { + pathsToWatch.add(path); + } + return pathsToWatch; + } - } diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 4658718480,aac1921914..560c7da0b2 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@@ -71,100 -58,21 +71,100 @@@ import com.google.gson.JsonParser public class Fate<T> { private static final Logger log = LoggerFactory.getLogger(Fate.class); - private final Logger runnerLog = LoggerFactory.getLogger(TransactionRunner.class); - private final TStore<T> store; - private final T environment; - private ExecutorService executor; + private final FateStore<T> store; - private final ScheduledThreadPoolExecutor fatePoolsWatcher; ++ private final ScheduledFuture<?> fatePoolsWatcherFuture; + private final AtomicInteger needMoreThreadsWarnCount = new AtomicInteger(0); + private final ExecutorService deadResCleanerExecutor; private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); + public static final Duration INITIAL_DELAY = Duration.ofSeconds(3); + private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofMinutes(3); + private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30); private final AtomicBoolean keepRunning = new AtomicBoolean(true); + private final Set<FateExecutor<T>> fateExecutors = new HashSet<>(); public enum TxInfo { - TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE + FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE + } + + public enum FateOperation { + COMMIT_COMPACTION(null), + NAMESPACE_CREATE(TFateOperation.NAMESPACE_CREATE), + NAMESPACE_DELETE(TFateOperation.NAMESPACE_DELETE), + NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME), + SHUTDOWN_TSERVER(null), + SYSTEM_SPLIT(null), + SYSTEM_MERGE(null), + TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2), + TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT), + TABLE_CLONE(TFateOperation.TABLE_CLONE), + TABLE_COMPACT(TFateOperation.TABLE_COMPACT), + TABLE_CREATE(TFateOperation.TABLE_CREATE), + TABLE_DELETE(TFateOperation.TABLE_DELETE), + TABLE_DELETE_RANGE(TFateOperation.TABLE_DELETE_RANGE), + TABLE_EXPORT(TFateOperation.TABLE_EXPORT), + TABLE_IMPORT(TFateOperation.TABLE_IMPORT), + TABLE_MERGE(TFateOperation.TABLE_MERGE), + TABLE_OFFLINE(TFateOperation.TABLE_OFFLINE), + TABLE_ONLINE(TFateOperation.TABLE_ONLINE), + TABLE_RENAME(TFateOperation.TABLE_RENAME), + TABLE_SPLIT(TFateOperation.TABLE_SPLIT), + TABLE_TABLET_AVAILABILITY(TFateOperation.TABLE_TABLET_AVAILABILITY); + + private final TFateOperation top; + private static final Set<FateOperation> nonThriftOps = Collections.unmodifiableSet( + EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT, SYSTEM_MERGE)); + private static final Set<FateOperation> allUserFateOps = + Collections.unmodifiableSet(EnumSet.allOf(FateOperation.class)); + private static final Set<FateOperation> allMetaFateOps = + Collections.unmodifiableSet(EnumSet.allOf(FateOperation.class)); + + FateOperation(TFateOperation top) { + this.top = top; + } + + public static FateOperation fromThrift(TFateOperation top) { + return FateOperation.valueOf(top.name()); + } + + public static Set<FateOperation> getNonThriftOps() { + return nonThriftOps; + } + + public TFateOperation toThrift() { + if (top == null) { + throw new IllegalStateException(this + " does not have an equivalent thrift form"); + } + return top; + } + + public static Set<FateOperation> getAllUserFateOps() { + return allUserFateOps; + } + + public static Set<FateOperation> getAllMetaFateOps() { + return allMetaFateOps; + } } - private class TransactionRunner implements Runnable { + // The fate pools watcher: + // - Maintains a TransactionRunner per available thread per pool/FateExecutor. Does so by + // periodically checking the pools for an inactive thread (i.e., a thread running a + // TransactionRunner died or the pool size was increased in the property), resizing the pool and + // submitting new runners as needed. Also safely stops the necessary number of TransactionRunners + // if the pool size in the property was decreased. + // - Warns the user to consider increasing the pool size (or splitting the fate ops assigned to + // that pool into separate pools) for any pool that does not often have any idle threads. + private class FatePoolsWatcher implements Runnable { + private final T environment; + private final AccumuloConfiguration conf; + + private FatePoolsWatcher(T environment, AccumuloConfiguration conf) { + this.environment = environment; + this.conf = conf; + } @Override public void run() { @@@ -223,122 -166,77 +223,122 @@@ } } } + } - private void transitionToFailed(long tid, Exception e) { - String tidStr = FateTxId.formatTid(tid); - final String msg = "Failed to execute Repo " + tidStr; - // Certain FATE ops that throw exceptions don't need to be propagated up to the Monitor - // as a warning. They're a normal, handled failure condition. - if (e instanceof AcceptableException) { - var tableOpEx = (AcceptableThriftTableOperationException) e; - log.debug(msg + " for {}({}) {}", tableOpEx.getTableName(), tableOpEx.getTableId(), - tableOpEx.getDescription()); - } else { - log.warn(msg, e); + /** + * A thread that finds reservations held by dead processes and unreserves them + */ + private class DeadReservationCleaner implements Runnable { + @Override + public void run() { + if (keepRunning.get()) { + store.deleteDeadReservations(); } - store.setTransactionInfo(tid, TxInfo.EXCEPTION, e); - store.setStatus(tid, FAILED_IN_PROGRESS); - log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", tidStr); } + } - private void processFailed(long tid, Repo<T> op) { - while (op != null) { - undo(tid, op); + /** + * Creates a Fault-tolerant executor for the given store type. + * + * @param runDeadResCleaner Whether this FATE should run a dead reservation cleaner. The real + * FATEs need have a cleaner, but may be undesirable in testing. + * @param toLogStrFunc A function that converts Repo to Strings that are suitable for logging + */ + public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner, - Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) { ++ Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf, ++ ScheduledThreadPoolExecutor genSchedExecutor) { + this.store = FateLogger.wrap(store, toLogStrFunc, false); + - this.fatePoolsWatcher = - ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf); - ThreadPools.watchCriticalScheduledTask( - fatePoolsWatcher.scheduleWithFixedDelay(new FatePoolsWatcher(environment, conf), - INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), SECONDS)); ++ fatePoolsWatcherFuture = ++ genSchedExecutor.scheduleWithFixedDelay(new FatePoolsWatcher(environment, conf), ++ INITIAL_DELAY.toSeconds(), getPoolWatcherDelay().toSeconds(), SECONDS); ++ ThreadPools.watchCriticalScheduledTask(fatePoolsWatcherFuture); + + ScheduledExecutorService deadResCleanerExecutor = null; + if (runDeadResCleaner) { + // Create a dead reservation cleaner for this store that will periodically clean up + // reservations held by dead processes, if they exist. + deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, + store.type() == FateInstanceType.USER ? USER_DEAD_RESERVATION_CLEANER_POOL.poolName + : META_DEAD_RESERVATION_CLEANER_POOL.poolName); + ScheduledFuture<?> deadReservationCleaner = + deadResCleanerExecutor.scheduleWithFixedDelay(new DeadReservationCleaner(), + INITIAL_DELAY.toSeconds(), getDeadResCleanupDelay().toSeconds(), SECONDS); + ThreadPools.watchCriticalScheduledTask(deadReservationCleaner); + } + this.deadResCleanerExecutor = deadResCleanerExecutor; - store.pop(tid); - op = store.top(tid); - } + startFateExecutors(environment, conf, fateExecutors); + } - store.setStatus(tid, FAILED); - doCleanUp(tid); + protected void startFateExecutors(T environment, AccumuloConfiguration conf, + Set<FateExecutor<T>> fateExecutors) { + for (var poolConf : getPoolConfigurations(conf).entrySet()) { + // no fate threads are running at this point; fine not to synchronize + fateExecutors + .add(new FateExecutor<>(this, environment, poolConf.getKey(), poolConf.getValue())); } + } - private void doCleanUp(long tid) { - Boolean autoClean = (Boolean) store.getTransactionInfo(tid, TxInfo.AUTO_CLEAN); - if (autoClean != null && autoClean) { - store.delete(tid); - } else { - // no longer need persisted operations, so delete them to save space in case - // TX is never cleaned up... - while (store.top(tid) != null) { - store.pop(tid); - } - } + /** + * Returns a map of the current pool configurations as set in the given config. Each key is a set + * of fate operations and each value is an integer for the number of threads assigned to work + * those fate operations. + */ + protected Map<Set<FateOperation>,Integer> getPoolConfigurations(AccumuloConfiguration conf) { + Map<Set<FateOperation>,Integer> poolConfigs = new HashMap<>(); + final var json = JsonParser.parseString(conf.get(getFateConfigProp())).getAsJsonObject(); + + for (var entry : json.entrySet()) { + var key = entry.getKey(); + var val = entry.getValue().getAsInt(); + var fateOpsStrArr = key.split(","); + Set<FateOperation> fateOpsSet = Arrays.stream(fateOpsStrArr).map(FateOperation::valueOf) + .collect(Collectors.toCollection(TreeSet::new)); + + poolConfigs.put(fateOpsSet, val); } - private void undo(long tid, Repo<T> op) { - try { - op.undo(tid, environment); - } catch (Exception e) { - log.warn("Failed to undo Repo, " + FateTxId.formatTid(tid), e); - } - } + return poolConfigs; } - protected long executeIsReady(Long tid, Repo<T> op) throws Exception { - var startTime = Timer.startNew(); - var deferTime = op.isReady(tid, environment); - if (log.isTraceEnabled()) { - log.trace("Running {}.isReady() {} took {} ms and returned {}", op.getName(), - FateTxId.formatTid(tid), startTime.elapsed(MILLISECONDS), deferTime); - } - return deferTime; + protected AtomicBoolean getKeepRunning() { + return keepRunning; + } + + protected FateStore<T> getStore() { + return store; + } + + protected Property getFateConfigProp() { + return this.store.type() == FateInstanceType.USER ? Property.MANAGER_FATE_USER_CONFIG + : Property.MANAGER_FATE_META_CONFIG; + } + + public Duration getDeadResCleanupDelay() { + return DEAD_RES_CLEANUP_DELAY; + } + + public Duration getPoolWatcherDelay() { + return POOL_WATCHER_DELAY; } - protected Repo<T> executeCall(Long tid, Repo<T> op) throws Exception { - var startTime = Timer.startNew(); - var next = op.call(tid, environment); - if (log.isTraceEnabled()) { - log.trace("Running {}.call() {} took {} ms and returned {}", op.getName(), - FateTxId.formatTid(tid), startTime.elapsed(MILLISECONDS), - next == null ? "null" : next.getName()); + /** + * Returns the number of TransactionRunners active for the FateExecutor assigned to work on the + * given set of fate operations. "Active" meaning it is waiting for a transaction to work on or + * actively working on one. Returns 0 if no such FateExecutor exists. This should only be used for + * testing + */ + @VisibleForTesting + public int getTxRunnersActive(Set<FateOperation> fateOps) { + synchronized (fateExecutors) { + for (var fateExecutor : fateExecutors) { + if (fateExecutor.getFateOps().equals(fateOps)) { + return fateExecutor.getRunningTxRunners().size(); + } + } } - return next; + return 0; } /** @@@ -480,102 -417,11 +480,86 @@@ } /** - * Flags that FATE threadpool to clear out and end. Does not actively stop running FATE processes. + * Lists transctions for a given fate key type. + */ + public Stream<FateKey> list(FateKey.FateKeyType type) { + return store.list(type); + } + + /** + * Initiates shutdown of background threads and optionally waits on them. */ - public void shutdown() { - keepRunning.set(false); - executor.shutdown(); + public void shutdown(long timeout, TimeUnit timeUnit) { + log.info("Shutting down {} FATE", store.type()); + + if (keepRunning.compareAndSet(true, false)) { + synchronized (fateExecutors) { + for (var fateExecutor : fateExecutors) { + fateExecutor.initiateShutdown(); + } + } + if (deadResCleanerExecutor != null) { + deadResCleanerExecutor.shutdown(); + } - fatePoolsWatcher.shutdown(); ++ fatePoolsWatcherFuture.cancel(false); + } + + if (timeout > 0) { + long start = System.nanoTime(); + try { + waitForAllFateExecShutdown(start, timeout, timeUnit); + waitForDeadResCleanerShutdown(start, timeout, timeUnit); - waitForFatePoolsWatcherShutdown(start, timeout, timeUnit); + - if (anyFateExecutorIsAlive() || deadResCleanerIsAlive() || fatePoolsWatcherIsAlive()) { ++ if (anyFateExecutorIsAlive() || deadResCleanerIsAlive()) { + log.warn( + "Waited for {}ms for all fate {} background threads to stop, but some are still running. " - + "fate executor threads:{} dead reservation cleaner thread:{} " - + "fate pools watcher thread:{}", ++ + "fate executor threads:{} dead reservation cleaner thread:{} ", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), store.type(), - anyFateExecutorIsAlive(), deadResCleanerIsAlive(), fatePoolsWatcherIsAlive()); ++ anyFateExecutorIsAlive(), deadResCleanerIsAlive()); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // interrupt the background threads + synchronized (fateExecutors) { + for (var fateExecutor : fateExecutors) { + fateExecutor.shutdownNow(); + fateExecutor.getIdleCountHistory().clear(); + } + } + if (deadResCleanerExecutor != null) { + deadResCleanerExecutor.shutdownNow(); + } - fatePoolsWatcher.shutdownNow(); + } + + private boolean anyFateExecutorIsAlive() { + synchronized (fateExecutors) { + return fateExecutors.stream().anyMatch(FateExecutor::isAlive); + } + } + + private boolean deadResCleanerIsAlive() { + return deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated(); } - private boolean fatePoolsWatcherIsAlive() { - return !fatePoolsWatcher.isTerminated(); - } - + private void waitForAllFateExecShutdown(long start, long timeout, TimeUnit timeUnit) + throws InterruptedException { + synchronized (fateExecutors) { + for (var fateExecutor : fateExecutors) { + fateExecutor.waitForShutdown(start, timeout, timeUnit); + } + } + } + + private void waitForDeadResCleanerShutdown(long start, long timeout, TimeUnit timeUnit) + throws InterruptedException { + while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) && deadResCleanerIsAlive()) { + if (deadResCleanerExecutor != null && !deadResCleanerExecutor.awaitTermination(1, SECONDS)) { + log.debug("Fate {} is waiting for dead reservation cleaner thread to terminate", + store.type()); + } + } + } - - private void waitForFatePoolsWatcherShutdown(long start, long timeout, TimeUnit timeUnit) - throws InterruptedException { - while (((System.nanoTime() - start) < timeUnit.toNanos(timeout)) && fatePoolsWatcherIsAlive()) { - if (!fatePoolsWatcher.awaitTermination(1, SECONDS)) { - log.debug("Fate {} is waiting for fate pools watcher thread to terminate", store.type()); - } - } - } } diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 2bfc6b7276,9d95a73429..953b70f53b --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -688,8 -686,8 +688,8 @@@ public class Compactor extends Abstract metricsInfo.init(getServiceTags(clientAddress)); var watcher = new CompactionWatcher(getConfiguration()); - var schedExecutor = ThreadPools.getServerThreadPools() - .createGeneralScheduledExecutorService(getConfiguration()); + var schedExecutor = getContext().getScheduledExecutor(); - startGCLogger(schedExecutor); ++ startCancelChecker(schedExecutor, getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL)); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 4c6d75755b,8cb51d160b..78e5c017c9 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -1487,16 -1494,18 +1487,16 @@@ public class Manager extends AbstractSe } } - @Deprecated - private void initializeZkForReplication(ZooReaderWriter zReaderWriter, String zroot) { - try { - org.apache.accumulo.server.replication.ZooKeeperInitialization - .ensureZooKeeperInitialized(zReaderWriter, zroot); - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Exception while ensuring ZooKeeper is initialized", e); - } - } + protected Fate<Manager> initializeFateInstance(ServerContext context, FateStore<Manager> store) { + - final Fate<Manager> fateInstance = - new Fate<>(this, store, true, TraceRepo::toLogString, getConfiguration()); ++ final Fate<Manager> fateInstance = new Fate<>(this, store, true, TraceRepo::toLogString, ++ getConfiguration(), context.getScheduledExecutor()); - protected Fate<Manager> initializeFateInstance(TStore<Manager> store) { - return new Fate<>(this, store, TraceRepo::toLogString); + var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), this::getSteadyTime); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() + .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES)); + + return fateInstance; } /** diff --cc test/src/main/java/org/apache/accumulo/test/fate/FastFate.java index e33906f29c,0000000000..f23596569f mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java @@@ -1,52 -1,0 +1,54 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import java.time.Duration; ++import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.function.Function; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.Repo; + +/** + * A FATE which performs the dead reservation cleanup and the check on the pool size with a much + * shorter delay between. Useful for shortening test times for tests that are waiting for one of + * these actions to occur. + */ +public class FastFate<T> extends Fate<T> { + private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofSeconds(5); + private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(5); + + public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner, + Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) { - super(environment, store, runDeadResCleaner, toLogStrFunc, conf); ++ super(environment, store, runDeadResCleaner, toLogStrFunc, conf, ++ new ScheduledThreadPoolExecutor(2)); + } + + @Override + public Duration getDeadResCleanupDelay() { + return DEAD_RES_CLEANUP_DELAY; + } + + @Override + public Duration getPoolWatcherDelay() { + return POOL_WATCHER_DELAY; + } +} diff --cc test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java index ba01be9990,0000000000..da3b76a242 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java @@@ -1,418 -1,0 +1,419 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; ++import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Iterators; + +public abstract class FateExecutionOrderIT extends SharedMiniClusterBase + implements FateTestRunner<FateExecutionOrderIT.FeoTestEnv> { + + public static class FeoTestEnv extends FateTestRunner.TestEnv { + private final AccumuloClient client; + + public FeoTestEnv(AccumuloClient client) { + this.client = client; + } + + AccumuloClient getClient() { + return client; + } + } + + public static class FirstOp implements Repo<FateExecutionOrderIT.FeoTestEnv> { + + private static final long serialVersionUID = 1L; + + protected boolean isTrackingDataSet(FateId tid, FeoTestEnv env, String step) throws Exception { + try (Scanner scanner = env.getClient().createScanner(FATE_TRACKING_TABLE)) { + return scanner.stream() + .anyMatch(e -> e.getKey().getColumnFamily().toString().equals(tid.canonical()) + && e.getValue().toString().equals(step)); + } + } + + protected static void insertTrackingData(FateId tid, FeoTestEnv env, String step) + throws TableNotFoundException, MutationsRejectedException { + try (BatchWriter bw = env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) { + Mutation mut = new Mutation(Long.toString(System.currentTimeMillis())); + mut.put(tid.canonical(), "", step); + bw.addMutation(mut); + } + } + + @Override + public long isReady(FateId tid, FeoTestEnv env) throws Exception { + // First call to isReady will return that it's not ready (defer time of 100ms), inserting + // the data 'isReady1' so we know isReady was called once. The second attempt (after the + // deferral time) will pass as ready (return 0) and insert the data 'isReady2' so we know + // the second call to isReady was made + Thread.sleep(50); + var step = this.getName() + "::isReady"; + if (isTrackingDataSet(tid, env, step + "1")) { + insertTrackingData(tid, env, step + "2"); + return 0; + } else { + insertTrackingData(tid, env, step + "1"); + return 100; + } + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv env) throws Exception { + Thread.sleep(50); + insertTrackingData(tid, env, this.getName() + "::call"); + return new SecondOp(); + } + + @Override + public void undo(FateId fateId, FeoTestEnv environment) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public String getReturn() { + return ""; + } + } + + public static class SecondOp extends FirstOp { + private static final long serialVersionUID = 1L; + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { + super.call(tid, environment); + return new LastOp(); + } + + } + + public static class LastOp extends FirstOp { + private static final long serialVersionUID = 1L; + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { + super.call(tid, environment); + return null; + } + } + + private static final String FATE_TRACKING_TABLE = "fate_tracking"; + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.withInitialTabletAvailability(TabletAvailability.HOSTED); + client.tableOperations().create(FATE_TRACKING_TABLE, ntc); + } + } + + @AfterAll + public static void teardown() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + @BeforeEach + public void before() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().deleteRows(FATE_TRACKING_TABLE, null, null); + } + } + + private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws Exception { + while (store.read(txid).getStatus() != SUCCESSFUL) { + Thread.sleep(50); + } + } + + protected Fate<FeoTestEnv> initializeFate(AccumuloClient client, FateStore<FeoTestEnv> store) { + return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "", - FateTestUtil.createTestFateConfig(1)); ++ FateTestUtil.createTestFateConfig(1), new ScheduledThreadPoolExecutor(2)); + } + + private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) { + return new AbstractMap.SimpleImmutableEntry<>( + FateId.from(e.getKey().getColumnFamily().toString()), e.getValue().toString()); + } + + @Test + public void testInterleaving() throws Exception { + executeTest(this::testInterleaving); + } + + protected void testInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx) + throws Exception { + + // This test verifies that FATE will interleave at least once between fate operations when + // their isReady() returns > 0. Interleaving is not guaranteed, so we just check for one + // occurrence which is highly unlikely to fail unless something is broken with FATE. + // This test also ensures that the expected order of operations occurs per fate op. + // Interleaving should have no effect on this. + + final int numFateIds = 3; + FateId[] fateIds = new FateId[numFateIds]; + + for (int i = 0; i < numFateIds; i++) { + fateIds[i] = store.create(); + var txStore = store.reserve(fateIds[i]); + try { + txStore.push(new FirstOp()); + txStore.setTransactionInfo(TxInfo.FATE_OP, TEST_FATE_OP); + txStore.setStatus(SUBMITTED); + } finally { + txStore.unreserve(Duration.ZERO); + } + } + + Fate<FeoTestEnv> fate = null; + + // The execution order of the transactions is not according to their insertion + // order. However, we do know that the first step of each transaction will be + // executed before the second steps. + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + fate = initializeFate(client, store); + + for (var fateId : fateIds) { + waitFor(store, fateId); + } + + Scanner scanner = client.createScanner(FATE_TRACKING_TABLE); + var iter = scanner.stream().map(FateExecutionOrderIT::toIdStep).iterator(); + + // we should see the following execution order for all fate ids: + // FirstOp::isReady1, FirstOp::isReady2, FirstOp::call, + // SecondOp::isReady1, SecondOp::isReady2, SecondOp::call, + // LastOp::isReady1, LastOp::isReady2, LastOp::call + // the first isReady of each op will defer the op to be executed later, allowing for the FATE + // thread to interleave and work on another fate id, but may not always interleave. + // It is unlikely that the FATE will not interleave at least once in a run, so we will check + // for at least one occurrence. + int interleaves = 0; + int i = 0; + Map.Entry<FateId,String> prevOp = null; + var expRunOrder = List.of("FirstOp::isReady1", "FirstOp::isReady2", "FirstOp::call", + "SecondOp::isReady1", "SecondOp::isReady2", "SecondOp::call", "LastOp::isReady1", + "LastOp::isReady2", "LastOp::call"); + var fateIdsToExpRunOrder = Map.of(fateIds[0], new ArrayList<>(expRunOrder), fateIds[1], + new ArrayList<>(expRunOrder), fateIds[2], new ArrayList<>(expRunOrder)); + + while (iter.hasNext()) { + var currOp = iter.next(); + FateId fateId = currOp.getKey(); + String currStep = currOp.getValue(); + var expRunOrderFateId = fateIdsToExpRunOrder.get(fateId); + + boolean passedFirstStep = !currStep.equals(expRunOrder.get(0)); + boolean prevFateIdDiffered = prevOp != null && !prevOp.getKey().equals(fateId); + if (passedFirstStep && prevFateIdDiffered) { + interleaves++; + } + assertEquals(currStep, expRunOrderFateId.remove(0)); + prevOp = currOp; + i++; + } + + assertTrue(interleaves > 0); + assertEquals(i, expRunOrder.size() * numFateIds); + assertEquals(numFateIds, fateIdsToExpRunOrder.size()); + for (var expRunOrderFateId : fateIdsToExpRunOrder.values()) { + assertTrue(expRunOrderFateId.isEmpty()); + } + + } finally { + if (fate != null) { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + } + + public static class FirstNonInterleavingOp extends FirstOp { + + private static final long serialVersionUID = 1L; + + @Override + public long isReady(FateId tid, FeoTestEnv env) throws Exception { + Thread.sleep(50); + insertTrackingData(tid, env, this.getName() + "::isReady"); + return 0; + } + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv manager) throws Exception { + Thread.sleep(50); + insertTrackingData(tid, manager, this.getName() + "::call"); + return new SecondNonInterleavingOp(); + } + } + + public static class SecondNonInterleavingOp extends FirstNonInterleavingOp { + + private static final long serialVersionUID = 1L; + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { + super.call(tid, environment); + return new LastNonInterleavingOp(); + } + + } + + public static class LastNonInterleavingOp extends FirstNonInterleavingOp { + + private static final long serialVersionUID = 1L; + + @Override + public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception { + super.call(tid, environment); + return null; + } + + } + + @Test + public void testNonInterleaving() throws Exception { + executeTest(this::testNonInterleaving); + } + + protected void testNonInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx) + throws Exception { + + // This test ensures that when isReady() always returns zero that all the fate steps will + // execute immediately + + final int numFateIds = 3; + FateId[] fateIds = new FateId[numFateIds]; + + for (int i = 0; i < numFateIds; i++) { + fateIds[i] = store.create(); + var txStore = store.reserve(fateIds[i]); + try { + txStore.push(new FirstNonInterleavingOp()); + txStore.setTransactionInfo(TxInfo.FATE_OP, TEST_FATE_OP); + txStore.setStatus(SUBMITTED); + } finally { + txStore.unreserve(Duration.ZERO); + } + } + + Fate<FeoTestEnv> fate = null; + + // The execution order of the transactions is not according to their insertion + // order. In this case, without interleaving, a transaction will run start to finish + // before moving on to the next transaction + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + fate = initializeFate(client, store); + + for (var fateId : fateIds) { + waitFor(store, fateId); + } + + Scanner scanner = client.createScanner(FATE_TRACKING_TABLE); + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + SortedMap<Key,Value> subset = new TreeMap<>(); + + // should see one fate op execute all of it steps + var seenId1 = verifySameIds(iter, subset); + // should see another fate op execute all of it steps + var seenId2 = verifySameIds(iter, subset); + // should see another fate op execute all of it steps + var seenId3 = verifySameIds(iter, subset); + + assertEquals(Set.of(fateIds[0], fateIds[1], fateIds[2]), Set.of(seenId1, seenId2, seenId3)); + + assertFalse(iter.hasNext()); + + } finally { + if (fate != null) { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + } + + private FateId verifySameIds(Iterator<Entry<Key,Value>> iter, SortedMap<Key,Value> subset) { + subset.clear(); + Iterators.limit(iter, 6).forEachRemaining(e -> subset.put(e.getKey(), e.getValue())); + + Text fateId = subset.keySet().iterator().next().getColumnFamily(); + assertTrue(subset.keySet().stream().allMatch(k -> k.getColumnFamily().equals(fateId))); + + // list is used to ensure correct operations and correct order of operations + var expectedVals = List.of("FirstNonInterleavingOp::isReady", "FirstNonInterleavingOp::call", + "SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call", + "LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call"); + var actualVals = subset.values().stream().map(Value::toString).collect(Collectors.toList()); + assertEquals(expectedVals, actualVals); + + return FateId.from(fateId.toString()); + } + +} diff --cc test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index bb70e6dffd,0000000000..fab396d8ca mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@@ -1,567 -1,0 +1,568 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateTestRunner.TestEnv; +import org.apache.accumulo.test.util.Wait; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class FateIT extends SharedMiniClusterBase implements FateTestRunner<TestEnv> { + + private static final Logger LOG = LoggerFactory.getLogger(FateIT.class); + + private static CountDownLatch callStarted; + private static CountDownLatch finishCall; + private static CountDownLatch undoLatch; + + private enum ExceptionLocation { + CALL, IS_READY + } + + public static class TestRepo implements Repo<TestEnv> { + private static final long serialVersionUID = 1L; + + private final String data; + + public TestRepo() { + this("test"); + } + + public TestRepo(String data) { + this.data = data; + } + + @Override + public long isReady(FateId fateId, TestEnv environment) throws Exception { + return 0; + } + + @Override + public String getName() { + return "TestRepo_" + data; + } + + @Override + public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws Exception { + LOG.debug("Entering call {}", fateId); + try { + FateIT.inCall(); + return null; + } finally { + LOG.debug("Leaving call {}", fateId); + } + } + + @Override + public void undo(FateId fateId, TestEnv environment) throws Exception { + + } + + @Override + public String getReturn() { + return data + "_ret"; + } + } + + public static class TestOperationFails implements Repo<TestEnv> { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(TestOperationFails.class); + private static List<String> undoOrder = new ArrayList<>(); + private static final int TOTAL_NUM_OPS = 3; + private int opNum; + private final String opName; + private final ExceptionLocation location; + + public TestOperationFails(int opNum, ExceptionLocation location) { + this.opNum = opNum; + this.opName = "OP" + opNum; + this.location = location; + } + + @Override + public long isReady(FateId fateId, TestEnv environment) throws Exception { + LOG.debug("{} {} Entered isReady()", opName, fateId); + if (location == ExceptionLocation.IS_READY) { + if (opNum < TOTAL_NUM_OPS) { + return 0; + } else { + throw new Exception(opName + " " + fateId + " isReady() failed - this is expected"); + } + } else { + return 0; + } + } + + @Override + public String getName() { + return getClass().getName(); + } + + @Override + public void undo(FateId fateId, TestEnv environment) throws Exception { + LOG.debug("{} {} Entered undo()", opName, fateId); + undoOrder.add(opName); + undoLatch.countDown(); + } + + @Override + public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws Exception { + LOG.debug("{} {} Entered call()", opName, fateId); + if (location == ExceptionLocation.CALL) { + if (opNum < TOTAL_NUM_OPS) { + return new TestOperationFails(++opNum, location); + } else { + throw new Exception(opName + " " + fateId + " call() failed - this is expected"); + } + } else { + return new TestOperationFails(++opNum, location); + } + } + + @Override + public String getReturn() { + return "none"; + } + } + + /** + * Test Repo that allows configuring a delay time to be returned in isReady(). + */ + public static class DeferredTestRepo implements Repo<TestEnv> { + private static final long serialVersionUID = 1L; + + private final String data; + + // These are static as we don't want to serialize them and they should + // be shared across all instances during the test + private static final AtomicInteger executedCalls = new AtomicInteger(); + private static final AtomicLong delay = new AtomicLong(); + private static final CountDownLatch callLatch = new CountDownLatch(1); + + public DeferredTestRepo(String data) { + this.data = data; + } + + @Override + public long isReady(FateId fateId, TestEnv environment) { + LOG.debug("{} delayed {}", fateId, delay.get()); + return delay.get(); + } + + @Override + public String getName() { + return "TestRepo_" + data; + } + + @Override + public Repo<TestEnv> call(FateId fateId, TestEnv environment) throws Exception { + callLatch.await(); + LOG.debug("Executing call {}, total executed {}", fateId, executedCalls.incrementAndGet()); + return null; + } + + @Override + public void undo(FateId fateId, TestEnv environment) { + + } + + @Override + public String getReturn() { + return data + "_ret"; + } + } + + @Test + @Timeout(30) + public void testTransactionStatus() throws Exception { + executeTest(this::testTransactionStatus); + } + + protected void testTransactionStatus(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + FateId fateId = fate.startTransaction(); + assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testTransactionStatus"), true, + "Test Op"); + assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); + // wait for call() to be called + callStarted.await(); + assertEquals(IN_PROGRESS, getTxStatus(sctx, fateId)); + // tell the op to exit the method + finishCall.countDown(); + // Check that it transitions to SUCCESSFUL and then removed (UNKNOWN) + final var sawSuccess = new AtomicBoolean(false); + Wait.waitFor(() -> { + TStatus s; + switch (s = getTxStatus(sctx, fateId)) { + case IN_PROGRESS: + if (sawSuccess.get()) { + fail("Should never see IN_PROGRESS after seeing SUCCESSFUL"); + } + break; + case SUCCESSFUL: + // expected, but might be too quick to be detected + if (sawSuccess.compareAndSet(false, true)) { + LOG.debug("Saw expected transaction status change to SUCCESSFUL"); + } + break; + case UNKNOWN: + if (!sawSuccess.get()) { + LOG.debug("Never saw transaction status change to SUCCESSFUL, but that's okay"); + } + return true; + default: + fail("Saw unexpected status: " + s); + } + // keep waiting for UNKNOWN + return false; + }, SECONDS.toMillis(30), 10); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + public void testCancelWhileNew() throws Exception { + executeTest(this::testCancelWhileNew); + } + + protected void testCancelWhileNew(FateStore<TestEnv> store, ServerContext sctx) throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileNew with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + // cancel the transaction + assertTrue(fate.cancel(fateId)); + assertTrue( + FAILED_IN_PROGRESS == getTxStatus(sctx, fateId) || FAILED == getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testCancelWhileNew"), true, + "Test Op"); + Wait.waitFor(() -> FAILED == getTxStatus(sctx, fateId)); + // nothing should have run + assertEquals(1, callStarted.getCount()); + fate.delete(fateId); + assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + public void testCancelWhileSubmittedAndRunning() throws Exception { + executeTest(this::testCancelWhileSubmittedAndRunning); + } + + protected void testCancelWhileSubmittedAndRunning(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileSubmitted with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testCancelWhileSubmittedAndRunning"), + false, "Test Op"); + Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, fateId)); + // This is false because the transaction runner has reserved the FaTe + // transaction. + assertFalse(fate.cancel(fateId)); + callStarted.await(); + finishCall.countDown(); + Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, fateId)); + fate.delete(fateId); + assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + public void testCancelWhileInCall() throws Exception { + executeTest(this::testCancelWhileInCall); + } + + protected void testCancelWhileInCall(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + callStarted = new CountDownLatch(1); + finishCall = new CountDownLatch(1); + + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileInCall with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testCancelWhileInCall"), true, + "Test Op"); + assertEquals(SUBMITTED, getTxStatus(sctx, fateId)); + // wait for call() to be called + callStarted.await(); + // cancel the transaction + assertFalse(fate.cancel(fateId)); + finishCall.countDown(); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + + } + + @Test + @Timeout(30) + public void testDeferredOverflow() throws Exception { + // Set a maximum deferred map size of 10 transactions so that when the 11th + // is seen the Fate store should clear the deferred map and mark + // the flag as overflow so that all the deferred transactions will be run + executeTest(this::testDeferredOverflow, 10, AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); + } + + protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + DeferredTestRepo.executedCalls.set(0); + // Initialize the repo to have a delay of 30 seconds + // so it will be deferred when submitted + DeferredTestRepo.delay.set(30000); + + Set<FateId> transactions = new HashSet<>(); + + // Start by creating 10 transactions that are all deferred which should + // fill up the deferred map with all 10 as we set the max deferred limit + // to only allow 10 transactions + for (int i = 0; i < 10; i++) { + submitDeferred(fate, sctx, transactions); + } + + // Verify all 10 are deferred in the map and each will + // We should not be in an overflow state yet + Wait.waitFor(() -> store.getDeferredCount() == 10); + assertFalse(store.isDeferredOverflow()); + + // After verifying all 10 are deferred, submit another 10 + // which should trigger an overflow. We are blocking in the + // call method of DeferredTestRepo at this point using a countdown + // latch to prevent fate executor from running early and clearing + // the deferred overflow flag before we can check it below + for (int i = 0; i < 10; i++) { + submitDeferred(fate, sctx, transactions); + } + // Verify deferred overflow is true and map is now empty + Wait.waitFor(() -> store.getDeferredCount() == 0); + Wait.waitFor(store::isDeferredOverflow); + + // Set the delay to 0 and countdown so we will process the + // call method in the repos. We need to change the delay because + // due to the async nature of Fate it's possible some of the submitted + // repos previously wouldn't be processed in the first batch until + // after the flag was cleared which would trigger a long delay again + DeferredTestRepo.delay.set(0); + DeferredTestRepo.callLatch.countDown(); + + // Verify the flag was cleared and everything ran + Wait.waitFor(() -> !store.isDeferredOverflow()); + Wait.waitFor(() -> DeferredTestRepo.executedCalls.get() == 20); + + // Verify all 20 unique transactions finished + Wait.waitFor(() -> { + transactions.removeIf(fateId -> getTxStatus(sctx, fateId) == UNKNOWN); + return transactions.isEmpty(); + }); + + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + @Timeout(30) + public void testRepoFails() throws Exception { + // Set a maximum deferred map size of 10 transactions so that when the 11th + // is seen the Fate store should clear the deferred map and mark + // the flag as overflow so that all the deferred transactions will be run + executeTest(this::testRepoFails, 10, AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); + } + + protected void testRepoFails(FateStore<TestEnv> store, ServerContext sctx) throws Exception { + /* + * This test ensures that when an exception occurs in a Repo's call() or isReady() methods, that + * undo() will be called back up the chain of Repo's and in the correct order. The test works as + * follows: 1) Repo1 is called and returns Repo2, 2) Repo2 is called and returns Repo3, 3) Repo3 + * is called and throws an exception (in call() or isReady()). It is then expected that: 1) + * undo() is called on Repo3, 2) undo() is called on Repo2, 3) undo() is called on Repo1 + */ + Fate<TestEnv> fate = initializeFate(store); + try { + + // Wait for the transaction runner to be scheduled. + Thread.sleep(Fate.INITIAL_DELAY.toMillis() * 2); + + List<String> expectedUndoOrder = List.of("OP3", "OP2", "OP1"); + /* + * Test exception in call() + */ + TestOperationFails.undoOrder = new ArrayList<>(); + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); + FateId fateId = fate.startTransaction(); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestOperationFails(1, ExceptionLocation.CALL), + false, "Test Op Fails"); + // Wait for all the undo() calls to complete + undoLatch.await(); + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); + assertEquals(FAILED, fate.waitForCompletion(fateId)); + assertTrue(fate.getException(fateId).getMessage().contains("call() failed")); + /* + * Test exception in isReady() + */ + TestOperationFails.undoOrder = new ArrayList<>(); + undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); + fateId = fate.startTransaction(); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, + new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test Op Fails"); + // Wait for all the undo() calls to complete + undoLatch.await(); + assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); + assertEquals(FAILED, fate.waitForCompletion(fateId)); + assertTrue(fate.getException(fateId).getMessage().contains("isReady() failed")); + } finally { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + + @Test + @Timeout(30) + public void testNoWriteAfterDelete() throws Exception { + executeTest(this::testNoWriteAfterDelete); + } + + protected void testNoWriteAfterDelete(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + final FateId fateId = store.create(); + final Repo<TestEnv> repo = new TestRepo("testNoWriteAfterDelete"); + + var txStore = store.reserve(fateId); + + // all write ops should be ok after reservation + assertDoesNotThrow(() -> txStore.push(repo)); + assertDoesNotThrow(() -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL)); + assertDoesNotThrow(txStore::pop); + assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, TEST_FATE_OP)); + assertDoesNotThrow(txStore::delete); + + // test that all write ops result in an exception since the tx has been deleted + assertThrows(Exception.class, () -> txStore.push(repo)); + assertThrows(Exception.class, () -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL)); + assertThrows(Exception.class, txStore::pop); + assertThrows(Exception.class, + () -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, TEST_FATE_OP)); + assertThrows(Exception.class, txStore::delete); + } + + private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx, Set<FateId> transactions) { + FateId fateId = fate.startTransaction(); + transactions.add(fateId); + assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction(TEST_FATE_OP, fateId, new DeferredTestRepo("testDeferredOverflow"), true, + "Test Op"); + assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); + } + + protected Fate<TestEnv> initializeFate(FateStore<TestEnv> store) { + return new Fate<>(new TestEnv(), store, false, r -> r + "", - FateTestUtil.createTestFateConfig(1)); ++ FateTestUtil.createTestFateConfig(1), new ScheduledThreadPoolExecutor(2)); + } + + protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId); + + private static void inCall() throws InterruptedException { + // signal that call started + callStarted.countDown(); + // wait for the signal to exit the method + finishCall.await(); + } +} diff --cc test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index f887fd4062,0000000000..86c2f3e3a9 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@@ -1,881 -1,0 +1,882 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; ++import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.AdminUtil; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.zookeeper.ZooSession; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.util.Admin; +import org.apache.accumulo.server.util.fateCommand.FateSummaryReport; +import org.apache.accumulo.server.util.fateCommand.FateTxnDetails; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestRepo; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.functional.ReadWriteIT; +import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.accumulo.test.util.Wait; +import org.easymock.EasyMock; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public abstract class FateOpsCommandsIT extends ConfigurableMacBase + implements FateTestRunner<LatchTestEnv> { + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(3); + } + + @BeforeEach + public void beforeEachSetup() throws Exception { + // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was + // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes + // this issue. + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getServerContext().getServerPaths() + .getCompactor(rg -> true, AddressSelector.all(), true).isEmpty(), 60_000); + } + + @Test + public void testFateSummaryCommand() throws Exception { + executeTest(this::testFateSummaryCommand); + } + + protected void testFateSummaryCommand(FateStore<LatchTestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store); + + // validate blank report, no transactions have started + ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + FateSummaryReport report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertTrue(report.getStatusCounts().isEmpty()); + assertTrue(report.getStepCounts().isEmpty()); + assertTrue(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertTrue(report.getFateIdFilter().isEmpty()); + validateFateDetails(report.getFateDetails(), 0, null); + + // create Fate transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + List<String> fateIdsStarted = List.of(fateId1.canonical(), fateId2.canonical()); + + // validate no filters + p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertTrue(report.getFateIdFilter().isEmpty()); + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); + + /* + * Test filtering by FateIds + */ + + // validate filtering by both transactions + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), fateId2.canonical(), + "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertEquals(2, report.getFateIdFilter().size()); + assertTrue(report.getFateIdFilter().containsAll(fateIdsStarted)); + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); + + // validate filtering by just one transaction + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertEquals(1, report.getFateIdFilter().size()); + assertTrue(report.getFateIdFilter().contains(fateId1.canonical())); + validateFateDetails(report.getFateDetails(), 1, fateIdsStarted); + + // validate filtering by non-existent transaction + FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID()); + p = getCluster().exec(Admin.class, "fate", fakeFateId.canonical(), "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertEquals(1, report.getFateIdFilter().size()); + assertTrue(report.getFateIdFilter().contains(fakeFateId.canonical())); + validateFateDetails(report.getFateDetails(), 0, fateIdsStarted); + + /* + * Test filtering by States + */ + + // validate status filter by including only FAILED transactions, should be none + p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "FAILED"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertEquals(Set.of("FAILED"), report.getStatusFilterNames()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertTrue(report.getFateIdFilter().isEmpty()); + validateFateDetails(report.getFateDetails(), 0, fateIdsStarted); + + // validate status filter by including only NEW transactions, should be 2 + p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "NEW"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertEquals(Set.of("NEW"), report.getStatusFilterNames()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertTrue(report.getFateIdFilter().isEmpty()); + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); + + /* + * Test filtering by FateInstanceType + */ + + // validate FateInstanceType filter by only including transactions with META filter + p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-t", "META"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertEquals(Set.of("META"), report.getInstanceTypesFilterNames()); + assertTrue(report.getFateIdFilter().isEmpty()); + if (store.type() == FateInstanceType.META) { + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); + } else { // USER + validateFateDetails(report.getFateDetails(), 0, fateIdsStarted); + } + + // validate FateInstanceType filter by only including transactions with USER filter + p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-t", "USER"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertEquals(Set.of("USER"), report.getInstanceTypesFilterNames()); + assertTrue(report.getFateIdFilter().isEmpty()); + if (store.type() == FateInstanceType.META) { + validateFateDetails(report.getFateDetails(), 0, fateIdsStarted); + } else { // USER + validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); + } + + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFateSummaryCommandPlainText() throws Exception { + executeTest(this::testFateSummaryCommandPlainText); + } + + protected void testFateSummaryCommandPlainText(FateStore<LatchTestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), fateId2.canonical(), + "--summary", "-s", "NEW", "-t", store.type().name()); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + assertTrue(result.contains("Status Filters: [NEW]")); + assertTrue(result + .contains("Fate ID Filters: [" + fateId1.canonical() + ", " + fateId2.canonical() + "]") + || result.contains( + "Fate ID Filters: [" + fateId2.canonical() + ", " + fateId1.canonical() + "]")); + assertTrue(result.contains("Instance Types Filters: [" + store.type().name() + "]")); + + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFatePrintCommand() throws Exception { + executeTest(this::testFatePrintCommand); + } + + protected void testFatePrintCommand(FateStore<LatchTestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store); + + // validate no transactions + ProcessInfo p = getCluster().exec(Admin.class, "fate", "--print"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + assertTrue(result.contains(" 0 transactions")); + + // create Fate transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Get all transactions. Should be 2 FateIds with a NEW status + p = getCluster().exec(Admin.class, "fate", "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + Map<String,String> fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromResult); + + /* + * Test filtering by States + */ + + // Filter by NEW state + p = getCluster().exec(Admin.class, "fate", "--print", "-s", "NEW"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromResult); + + // Filter by FAILED state + p = getCluster().exec(Admin.class, "fate", "--print", "-s", "FAILED"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertTrue(fateIdsFromResult.isEmpty()); + + /* + * Test filtering by FateIds + */ + + // Filter by one FateId + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(Map.of(fateId1.canonical(), "NEW"), fateIdsFromResult); + + // Filter by both FateIds + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), fateId2.canonical(), "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromResult); + + // Filter by non-existent FateId + FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID()); + p = getCluster().exec(Admin.class, "fate", fakeFateId.canonical(), "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(0, fateIdsFromResult.size()); + + /* + * Test filtering by FateInstanceType + */ + + // Test filter by USER FateInstanceType + p = getCluster().exec(Admin.class, "fate", "--print", "-t", "USER"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + if (store.type() == FateInstanceType.META) { + assertTrue(fateIdsFromResult.isEmpty()); + } else { // USER + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromResult); + } + + // Test filter by META FateInstanceType + p = getCluster().exec(Admin.class, "fate", "--print", "-t", "META"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + if (store.type() == FateInstanceType.META) { + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromResult); + } else { // USER + assertTrue(fateIdsFromResult.isEmpty()); + } + + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testTransactionNameAndStep() throws Exception { + executeTest(this::testTransactionNameAndStep); + } + + protected void testTransactionNameAndStep(FateStore<LatchTestEnv> store, ServerContext sctx) + throws Exception { + // Since the other tests just use NEW transactions for simplicity, there are some fields of the + // summary and print outputs which are null and not tested for (transaction name and transaction + // step). This test uses seeded/in progress transactions to test that the summary and print + // commands properly output these fields. + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + final String table = getUniqueNames(1)[0]; + + IteratorSetting is = new IteratorSetting(1, SlowIterator.class); + is.addOption("sleepTime", "10000"); + + NewTableConfiguration cfg = new NewTableConfiguration(); + cfg.attachIterator(is, EnumSet.of(IteratorUtil.IteratorScope.majc)); + client.tableOperations().create(table, cfg); + + ReadWriteIT.ingest(client, 10, 10, 10, 0, table); + client.tableOperations().flush(table, null, null, true); + + // create 2 Fate transactions + client.tableOperations().compact(table, null, null, false, false); + client.tableOperations().compact(table, null, null, false, false); + List<String> fateIdsStarted = new ArrayList<>(); + + ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + + String result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + FateSummaryReport report = FateSummaryReport.fromJson(result); + + // Validate transaction name and transaction step from summary command + + for (FateTxnDetails d : report.getFateDetails()) { + assertEquals("TABLE_COMPACT", d.getFateOp()); + assertEquals("CompactionDriver", d.getStep()); + fateIdsStarted.add(d.getFateId()); + } + assertEquals(2, fateIdsStarted.size()); + + p = getCluster().exec(Admin.class, "fate", "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + + // Validate transaction name and transaction step from print command + + String[] lines = result.split("\n"); + // Filter out the result to just include the info about the transactions + List<String> transactionInfo = Arrays.stream(lines) + .filter( + line -> line.contains(fateIdsStarted.get(0)) || line.contains(fateIdsStarted.get(1))) + .collect(Collectors.toList()); + assertEquals(2, transactionInfo.size()); + for (String info : transactionInfo) { + assertTrue(info.contains("TABLE_COMPACT")); + assertTrue(info.contains("op: CompactionDriver")); + } + + client.tableOperations().delete(table); + } + } + + @Test + public void testFateCancelCommand() throws Exception { + executeTest(this::testFateCancelCommand); + } + + protected void testFateCancelCommand(FateStore<LatchTestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map<String,String> fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Cancel the first transaction and ensure that it was cancelled + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--cancel"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result + .contains("transaction " + fateId1.canonical() + " was cancelled or already completed")); + fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "FAILED", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFateFailCommandTimeout() throws Exception { + stopManagerAndExecuteTest(this::testFateFailCommandTimeout); + } + + protected void testFateFailCommandTimeout(FateStore<LatchTestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + LatchTestEnv env = new LatchTestEnv(); + FastFate<LatchTestEnv> fate = initFateWithDeadResCleaner(store, env); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map<String,String> fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Seed the transaction with the latch repo, so we can have an IN_PROGRESS transaction + fate.seedTransaction(TEST_FATE_OP, fateId1, new LatchTestRepo(), true, "test"); + // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1) + Wait.waitFor(() -> env.numWorkers.get() == 1); + + // Try to fail fateId1 + // This should not work as it is already reserved and being worked on by our running FATE + // ('fate'). Admin should try to reserve it for a bit, but should fail and exit + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Could not fail " + fateId1 + " in a reasonable time")); + fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Finish work and shutdown + env.workersLatch.countDown(); + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFateFailCommandSuccess() throws Exception { + executeTest(this::testFateFailCommandSuccess); + } + + protected void testFateFailCommandSuccess(FateStore<LatchTestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map<String,String> fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Try to fail fateId1 + // This should work since nothing has fateId1 reserved (it is NEW) + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Failing transaction: " + fateId1)); + fateIdsFromSummary = getFateIdsFromSummary(); + assertTrue(fateIdsFromSummary + .equals(Map.of(fateId1.canonical(), "FAILED_IN_PROGRESS", fateId2.canonical(), "NEW")) + || fateIdsFromSummary + .equals(Map.of(fateId1.canonical(), "FAILED", fateId2.canonical(), "NEW"))); + + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFateDeleteCommandTimeout() throws Exception { + stopManagerAndExecuteTest(this::testFateDeleteCommandTimeout); + } + + protected void testFateDeleteCommandTimeout(FateStore<LatchTestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + LatchTestEnv env = new LatchTestEnv(); + FastFate<LatchTestEnv> fate = initFateWithDeadResCleaner(store, env); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map<String,String> fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Seed the transaction with the latch repo, so we can have an IN_PROGRESS transaction + fate.seedTransaction(TEST_FATE_OP, fateId1, new LatchTestRepo(), true, "test"); + // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1) + Wait.waitFor(() -> env.numWorkers.get() == 1); + + // Try to delete fateId1 + // This should not work as it is already reserved and being worked on by our running FATE + // ('fate'). Admin should try to reserve it for a bit, but should fail and exit + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Could not delete " + fateId1 + " in a reasonable time")); + fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Finish work and shutdown + env.workersLatch.countDown(); + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFateDeleteCommandSuccess() throws Exception { + executeTest(this::testFateDeleteCommandSuccess); + } + + protected void testFateDeleteCommandSuccess(FateStore<LatchTestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<LatchTestEnv> fate = initFateNoDeadResCleaner(store); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map<String,String> fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Try to delete fateId1 + // This should work since nothing has fateId1 reserved (it is NEW) + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Deleting transaction: " + fateId1)); + fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId2.canonical(), "NEW"), fateIdsFromSummary); + + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFatePrintAndSummaryCommandsWithInProgressTxns() throws Exception { + executeTest(this::testFatePrintAndSummaryCommandsWithInProgressTxns); + } + + protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore<LatchTestEnv> store, + ServerContext sctx) throws Exception { + // This test was written for an issue with the 'admin fate --print' and 'admin fate --summary' + // commands where transactions could complete mid-print causing the command to fail. These + // commands first get a list of the transactions and then probe for info on the transactions. + // If a transaction completed between getting the list and probing for info on that + // transaction, the command would fail. This test ensures that this problem has been fixed + // (if the tx no longer exists, it should just be ignored so the print/summary can complete). + FateStore<LatchTestEnv> mockedStore; + + // This error was occurring in AdminUtil.getTransactionStatus(), so we will test this method. + if (store.type().equals(FateInstanceType.USER)) { + Method listMethod = UserFateStore.class.getMethod("list"); + mockedStore = EasyMock.createMockBuilder(UserFateStore.class) + .withConstructor(ClientContext.class, String.class, ZooUtil.LockID.class, Predicate.class) + .withArgs(sctx, AccumuloTable.FATE.tableName(), null, null).addMockedMethod(listMethod) + .createMock(); + } else { + Method listMethod = MetaFateStore.class.getMethod("list"); + mockedStore = EasyMock.createMockBuilder(MetaFateStore.class) + .withConstructor(ZooSession.class, ZooUtil.LockID.class, Predicate.class) + .withArgs(sctx.getZooSession(), null, null).addMockedMethod(listMethod).createMock(); + } + + // 3 FateIds, two that exist and one that does not. We are simulating that a transaction that + // doesn't exist is accessed in getTransactionStatus() and ensuring that this doesn't cause + // the method to fail or have any unexpected behavior. + FateId tx1 = store.create(); + FateId tx2 = FateId.from(store.type(), UUID.randomUUID()); + FateId tx3 = store.create(); + + List<ReadOnlyFateStore.FateIdStatus> fateIdStatusList = + List.of(createFateIdStatus(tx1), createFateIdStatus(tx2), createFateIdStatus(tx3)); + expect(mockedStore.list()).andReturn(fateIdStatusList.stream()).once(); + + replay(mockedStore); + + AdminUtil.FateStatus status = null; + try { + status = AdminUtil.getTransactionStatus(Map.of(store.type(), mockedStore), null, null, null, + new HashMap<>(), new HashMap<>()); + } catch (Exception e) { + fail("An unexpected error occurred in getTransactionStatus():\n" + e); + } + + verify(mockedStore); + + assertNotNull(status); + // All three should be returned + assertEquals(3, status.getTransactions().size()); + assertEquals(status.getTransactions().stream().map(AdminUtil.TransactionStatus::getFateId) + .collect(Collectors.toList()), List.of(tx1, tx2, tx3)); + // The two real FateIds should have NEW status and the fake one should be UNKNOWN + assertEquals( + status.getTransactions().stream().map(AdminUtil.TransactionStatus::getStatus) + .collect(Collectors.toList()), + List.of(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN, + ReadOnlyFateStore.TStatus.NEW)); + // None of them should have a name since none of them were seeded with work + assertEquals(status.getTransactions().stream().map(AdminUtil.TransactionStatus::getFateOp) + .collect(Collectors.toList()), Arrays.asList(null, null, null)); + // None of them should have a Repo since none of them were seeded with work + assertEquals(status.getTransactions().stream().map(AdminUtil.TransactionStatus::getTop) + .collect(Collectors.toList()), Arrays.asList(null, null, null)); + // The FateId that doesn't exist should have a creation time of 0, the others should not + List<Long> timeCreated = status.getTransactions().stream() + .map(AdminUtil.TransactionStatus::getTimeCreated).collect(Collectors.toList()); + assertNotEquals(timeCreated.get(0), 0); + assertEquals(timeCreated.get(1), 0); + assertNotEquals(timeCreated.get(2), 0); + // All should have the store.type() type + assertEquals(status.getTransactions().stream().map(AdminUtil.TransactionStatus::getInstanceType) + .collect(Collectors.toList()), List.of(store.type(), store.type(), store.type())); + } + + private ReadOnlyFateStore.FateIdStatus createFateIdStatus(FateId fateId) { + // We are only using the fateId from this, so null/empty is fine for the rest + return new AbstractFateStore.FateIdStatusBase(fateId) { + @Override + public ReadOnlyFateStore.TStatus getStatus() { + return null; + } + + @Override + public Optional<FateStore.FateReservation> getFateReservation() { + return Optional.empty(); + } + + @Override + public Optional<Fate.FateOperation> getFateOperation() { + return Optional.empty(); + } + }; + } + + /** + * + * @param printResult the output of the --print fate command + * @return a map of each of the FateIds to their status using the output of --print + */ + private Map<String,String> getFateIdsFromPrint(String printResult) { + Map<String,String> fateIdToStatus = new HashMap<>(); + String lastFateIdSeen = null; + String[] words = printResult.split(" "); + for (String word : words) { + if (FateId.isFateId(word)) { + if (!fateIdToStatus.containsKey(word)) { + lastFateIdSeen = word; + } else { + log.debug( + "--print listed the same transaction more than once. This should not occur, failing"); + fail(); + } + } else if (wordIsTStatus(word)) { + fateIdToStatus.put(lastFateIdSeen, word); + } + } + return fateIdToStatus; + } + + /** + * + * @return a map of each of the FateIds to their status using the --summary command + */ + private Map<String,String> getFateIdsFromSummary() throws Exception { + ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + result = result.lines().filter(line -> !line.matches(".*(INFO|DEBUG|WARN|ERROR).*")) + .collect(Collectors.joining("\n")); + FateSummaryReport report = FateSummaryReport.fromJson(result); + assertNotNull(report); + Map<String,String> fateIdToStatus = new HashMap<>(); + report.getFateDetails().forEach((d) -> { + fateIdToStatus.put(d.getFateId(), d.getStatus()); + }); + return fateIdToStatus; + } + + /** + * Validates the fate details of NEW transactions + * + * @param details the fate details from the {@link FateSummaryReport} + * @param expDetailsSize the expected size of details + * @param fateIdsStarted the list of fate ids that have been started + */ + private void validateFateDetails(Set<FateTxnDetails> details, int expDetailsSize, + List<String> fateIdsStarted) { + assertEquals(expDetailsSize, details.size()); + for (FateTxnDetails d : details) { + assertTrue(fateIdsStarted.contains(d.getFateId())); + assertEquals("NEW", d.getStatus()); + assertEquals("?", d.getStep()); + assertEquals("?", d.getFateOp()); + assertNotEquals(0, d.getRunning()); + assertEquals("[]", d.getLocksHeld().toString()); + assertEquals("[]", d.getLocksWaiting().toString()); + } + } + + protected FastFate<LatchTestEnv> initFateWithDeadResCleaner(FateStore<LatchTestEnv> store, + LatchTestEnv env) { + // Using FastFate so the cleanup will run often. This ensures that the cleanup will run when + // there are reservations present and that the cleanup will not unexpectedly delete these live + // reservations + return new FastFate<>(env, store, true, Object::toString, DefaultConfiguration.getInstance()); + } + + protected Fate<LatchTestEnv> initFateNoDeadResCleaner(FateStore<LatchTestEnv> store) { + return new Fate<>(new LatchTestEnv(), store, false, Object::toString, - DefaultConfiguration.getInstance()); ++ DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); + } + + private boolean wordIsTStatus(String word) { + try { + ReadOnlyFateStore.TStatus.valueOf(word); + } catch (IllegalArgumentException e) { + return false; + } + return true; + } + + /** + * Stop the MANAGER. For some of our tests, we want to be able to seed transactions with our own + * test repos. We want our fate to reserve these transactions (and not the real fates running in + * the Manager as that will lead to exceptions since the real fates wouldn't be able to handle our + * test repos). So, we essentially have the fates created here acting as the real fates: they have + * the same threads running that the real fates would, use a fate store with a ZK lock, use the + * same locations to store fate data that the Manager does, and are running in a separate process + * from the Admin process. Note that we cannot simply use different locations for our fate data + * from Manager to keep our test env separate from Manager. Admin uses the real fate data + * locations, so our test must also use the real locations. + */ + protected void stopManager() throws IOException { + getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); + } +} diff --cc test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java index 7ff60e5bed,424248b590..15e429d581 --- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java @@@ -18,15 -18,11 +18,16 @@@ */ package org.apache.accumulo.test.fate; +import java.util.Set; ++import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.function.Function; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateExecutor; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.Repo; -import org.apache.accumulo.core.fate.TStore; import com.google.common.base.Preconditions; @@@ -35,9 -31,8 +36,9 @@@ */ public class FlakyFate<T> extends Fate<T> { - public FlakyFate(T environment, TStore<T> store, Function<Repo<T>,String> toLogStrFunc) { - super(environment, store, toLogStrFunc); + public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStrFunc, + AccumuloConfiguration conf) { - super(environment, store, false, toLogStrFunc, conf); ++ super(environment, store, false, toLogStrFunc, conf, new ScheduledThreadPoolExecutor(2)); } @Override diff --cc test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index 472f9263c9,0000000000..0637b520a2 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@@ -1,475 -1,0 +1,477 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.test.fate.FateTestUtil.TEST_FATE_OP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.test.util.Wait; +import org.apache.zookeeper.KeeperException; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; + +public abstract class MultipleStoresIT extends SharedMiniClusterBase { + + private static final Logger LOG = LoggerFactory.getLogger(MultipleStoresIT.class); + + @Test + public void testReserveUnreserve() throws Exception { + executeSleepingEnvTest(this::testReserveUnreserve); + } + + private void testReserveUnreserve(TestStoreFactory<SleepingTestEnv> testStoreFactory) + throws Exception { + // reserving/unreserving a FateId should be reflected across instances of the stores + final int numFateIds = 500; + final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>(); + final Set<FateId> allIds = new HashSet<>(); + final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + Map<FateId,FateStore.FateReservation> activeReservations; + final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, null); + final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, null); + final FateId fakeFateId = FateId.from(store1.type(), UUID.randomUUID()); + + // Create the fate ids using store1 + for (int i = 0; i < numFateIds; i++) { + assertTrue(allIds.add(store1.create())); + } + assertEquals(numFateIds, allIds.size()); + + // Reserve half the fate ids using store1 and rest using store2, after reserving a fate id in + // one, should not be able to reserve the same in the other. Should also not matter that all the + // ids were created using store1 + int count = 0; + for (FateId fateId : allIds) { + if (count % 2 == 0) { + reservations.add(store1.reserve(fateId)); + assertTrue(store2.tryReserve(fateId).isEmpty()); + } else { + reservations.add(store2.reserve(fateId)); + assertTrue(store1.tryReserve(fateId).isEmpty()); + } + count++; + } + // Try to reserve a non-existent fate id + assertTrue(store1.tryReserve(fakeFateId).isEmpty()); + assertTrue(store2.tryReserve(fakeFateId).isEmpty()); + // Both stores should return the same reserved transactions + activeReservations = store1.getActiveReservations(); + assertEquals(allIds, activeReservations.keySet()); + activeReservations = store2.getActiveReservations(); + assertEquals(allIds, activeReservations.keySet()); + + // Test setting/getting the TStatus and unreserving the transactions + for (int i = 0; i < allIds.size(); i++) { + var reservation = reservations.get(i); + assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus()); + reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED); + assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED, reservation.getStatus()); + reservation.delete(); + reservation.unreserve(Duration.ofMillis(0)); + // Attempt to set a status on a tx that has been unreserved (should throw exception) + assertThrows(IllegalStateException.class, + () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW)); + } + assertTrue(store1.getActiveReservations().isEmpty()); + assertTrue(store2.getActiveReservations().isEmpty()); + } + + @Test + public void testReserveNonExistentTxn() throws Exception { + executeSleepingEnvTest(this::testReserveNonExistentTxn); + } + + private void testReserveNonExistentTxn(TestStoreFactory<SleepingTestEnv> testStoreFactory) + throws Exception { + // Tests that reserve() doesn't hang indefinitely and instead throws an error + // on reserve() a non-existent transaction. + final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); + final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null); + final FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID()); + + var err = assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId)); + assertTrue(err.getMessage().contains(fakeFateId.canonical())); + } + + @Test + public void testReserveReservedAndUnreserveUnreserved() throws Exception { + executeSleepingEnvTest(this::testReserveReservedAndUnreserveUnreserved); + } + + private void testReserveReservedAndUnreserveUnreserved( + TestStoreFactory<SleepingTestEnv> testStoreFactory) throws Exception { + final int numFateIds = 500; + final Set<FateId> allIds = new HashSet<>(); + final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>(); + final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); + final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null); + + // Create some FateIds and ensure that they can be reserved + for (int i = 0; i < numFateIds; i++) { + FateId fateId = store.create(); + assertTrue(allIds.add(fateId)); + var reservation = store.tryReserve(fateId); + assertFalse(reservation.isEmpty()); + reservations.add(reservation.orElseThrow()); + } + assertEquals(numFateIds, allIds.size()); + + // Try to reserve again, should not reserve + for (FateId fateId : allIds) { + assertTrue(store.tryReserve(fateId).isEmpty()); + } + + // Unreserve all the FateIds + for (var reservation : reservations) { + reservation.unreserve(Duration.ofMillis(0)); + } + // Try to unreserve again (should throw exception) + for (var reservation : reservations) { + assertThrows(IllegalStateException.class, () -> reservation.unreserve(Duration.ofMillis(0))); + } + } + + @Test + public void testReserveAfterUnreserveAndReserveAfterDeleted() throws Exception { + executeSleepingEnvTest(this::testReserveAfterUnreserveAndReserveAfterDeleted); + } + + private void testReserveAfterUnreserveAndReserveAfterDeleted( + TestStoreFactory<SleepingTestEnv> testStoreFactory) throws Exception { + final int numFateIds = 500; + final Set<FateId> allIds = new HashSet<>(); + final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>(); + final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); + final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null); + + // Create some FateIds and ensure that they can be reserved + for (int i = 0; i < numFateIds; i++) { + FateId fateId = store.create(); + assertTrue(allIds.add(fateId)); + var reservation = store.tryReserve(fateId); + assertFalse(reservation.isEmpty()); + reservations.add(reservation.orElseThrow()); + } + assertEquals(numFateIds, allIds.size()); + + // Unreserve all + for (var reservation : reservations) { + reservation.unreserve(Duration.ofMillis(0)); + } + + // Ensure they can be reserved again, and delete and unreserve this time + for (FateId fateId : allIds) { + // Verify that the tx status is still NEW after unreserving since it hasn't been deleted + assertEquals(ReadOnlyFateStore.TStatus.NEW, store.read(fateId).getStatus()); + var reservation = store.tryReserve(fateId); + assertFalse(reservation.isEmpty()); + reservation.orElseThrow().delete(); + reservation.orElseThrow().unreserve(Duration.ofMillis(0)); + } + + for (FateId fateId : allIds) { + // Verify that the tx is now unknown since it has been deleted + assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, store.read(fateId).getStatus()); + // Attempt to reserve a deleted txn, should throw an exception and not wait indefinitely + var err = assertThrows(IllegalStateException.class, () -> store.reserve(fateId)); + assertTrue(err.getMessage().contains(fateId.canonical())); + } + } + + @Test + public void testMultipleFateInstances() throws Exception { + executeSleepingEnvTest(this::testMultipleFateInstances); + } + + private void testMultipleFateInstances(TestStoreFactory<SleepingTestEnv> testStoreFactory) + throws Exception { + final int numFateIds = 500; + final Set<FateId> allIds = new HashSet<>(); + final SleepingTestEnv testEnv1 = new SleepingTestEnv(50); + final SleepingTestEnv testEnv2 = new SleepingTestEnv(50); + final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + final Set<ZooUtil.LockID> liveLocks = new HashSet<>(); + final Predicate<ZooUtil.LockID> isLockHeld = liveLocks::contains; + final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, isLockHeld); + final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, isLockHeld); + + liveLocks.add(lock1); + liveLocks.add(lock2); + - Fate<SleepingTestEnv> fate1 = - new Fate<>(testEnv1, store1, true, Object::toString, DefaultConfiguration.getInstance()); - Fate<SleepingTestEnv> fate2 = - new Fate<>(testEnv2, store2, false, Object::toString, DefaultConfiguration.getInstance()); ++ Fate<SleepingTestEnv> fate1 = new Fate<>(testEnv1, store1, true, Object::toString, ++ DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); ++ Fate<SleepingTestEnv> fate2 = new Fate<>(testEnv2, store2, false, Object::toString, ++ DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); + + try { + for (int i = 0; i < numFateIds; i++) { + FateId fateId; + // Start half the txns using fate1, and the other half using fate2 + if (i % 2 == 0) { + fateId = fate1.startTransaction(); + fate1.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), true, "test"); + } else { + fateId = fate2.startTransaction(); + fate2.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), true, "test"); + } + allIds.add(fateId); + } + assertEquals(numFateIds, allIds.size()); + + // Should be able to wait for completion on any fate instance + for (FateId fateId : allIds) { + fate2.waitForCompletion(fateId); + } + // Ensure that all txns have been executed and have only been executed once + assertTrue(Collections.disjoint(testEnv1.executedOps, testEnv2.executedOps)); + assertEquals(allIds, Sets.union(testEnv1.executedOps, testEnv2.executedOps)); + } finally { + fate1.shutdown(1, TimeUnit.MINUTES); + fate2.shutdown(1, TimeUnit.MINUTES); + } + } + + @Test + public void testDeadReservationsCleanup() throws Exception { + executeLatchEnvTest(this::testDeadReservationsCleanup); + } + + private void testDeadReservationsCleanup(TestStoreFactory<LatchTestEnv> testStoreFactory) + throws Exception { + // Tests reserving some transactions, then simulating that the Manager died by creating + // a new Fate instance and store with a new LockID. The transactions which were + // reserved using the old LockID should be cleaned up by Fate's DeadReservationCleaner, + // then picked up by the new Fate/store. + + // > 1 to have some concurrency to simulate realistic fate scenario + final int numThreads = 10; + // One transaction for each FATE worker thread + final int numFateIds = numThreads; + final Set<FateId> allIds = new HashSet<>(); + final LatchTestEnv testEnv1 = new LatchTestEnv(); + final LatchTestEnv testEnv2 = new LatchTestEnv(); + final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + final Set<ZooUtil.LockID> liveLocks = new HashSet<>(); + final Predicate<ZooUtil.LockID> isLockHeld = liveLocks::contains; + final AccumuloConfiguration config = FateTestUtil.createTestFateConfig(numThreads); + Map<FateId,FateStore.FateReservation> reservations; + + final FateStore<LatchTestEnv> store1 = testStoreFactory.create(lock1, isLockHeld); + liveLocks.add(lock1); + Fate<LatchTestEnv> fate1 = null; + Fate<LatchTestEnv> fate2 = null; + + try { + fate1 = new FastFate<>(testEnv1, store1, true, Object::toString, config); + // Ensure nothing is reserved yet + assertTrue(store1.getActiveReservations().isEmpty()); + + // Create transactions + for (int i = 0; i < numFateIds; i++) { + FateId fateId; + fateId = fate1.startTransaction(); + fate1.seedTransaction(TEST_FATE_OP, fateId, new LatchTestRepo(), true, "test"); + allIds.add(fateId); + } + assertEquals(numFateIds, allIds.size()); + + // Wait for all the fate worker threads to start working on the transactions + Wait.waitFor(() -> testEnv1.numWorkers.get() == numFateIds); + // Each fate worker will be hung up working (IN_PROGRESS) on a single transaction + + // Verify store1 has the transactions reserved and that they were reserved with lock1 + reservations = store1.getActiveReservations(); + assertEquals(allIds, reservations.keySet()); + reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); + + final FateStore<LatchTestEnv> store2 = testStoreFactory.create(lock2, isLockHeld); + + // Verify store2 can see the reserved transactions even though they were reserved using + // store1 + reservations = store2.getActiveReservations(); + assertEquals(allIds, reservations.keySet()); + reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); + + // Simulate what would happen if the Manager using the Fate object (fate1) died. + // isLockHeld would return false for the LockId of the Manager that died (in this case, lock1) + // and true for the new Manager's lock (lock2) + liveLocks.remove(lock1); + liveLocks.add(lock2); + + // Create the new Fate/start the Fate threads (the work finder and the workers). + // Don't run another dead reservation cleaner since we already have one running from fate1. - fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config); ++ fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config, ++ new ScheduledThreadPoolExecutor(2)); + + // Wait for the "dead" reservations to be deleted and picked up again (reserved using + // fate2/store2/lock2 now). + // They are considered "dead" if they are held by lock1 in this test. We don't have to worry + // about fate1/store1/lock1 being used to reserve the transactions again since all + // the workers for fate1 are hung up + Wait.waitFor(() -> { + Map<FateId,FateStore.FateReservation> store2Reservations = store2.getActiveReservations(); + boolean allReservedWithLock2 = + store2Reservations.values().stream().allMatch(entry -> entry.getLockID().equals(lock2)); + return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; + }, fate1.getDeadResCleanupDelay().toMillis() * 2); + } finally { + // Finish work and shutdown + testEnv1.workersLatch.countDown(); + testEnv2.workersLatch.countDown(); + if (fate1 != null) { + fate1.shutdown(1, TimeUnit.MINUTES); + } + if (fate2 != null) { + fate2.shutdown(1, TimeUnit.MINUTES); + } + } + } + + public static class SleepingTestRepo implements Repo<SleepingTestEnv> { + private static final long serialVersionUID = 1L; + + @Override + public long isReady(FateId fateId, SleepingTestEnv environment) { + return 0; + } + + @Override + public String getName() { + return null; + } + + @Override + public Repo<SleepingTestEnv> call(FateId fateId, SleepingTestEnv environment) throws Exception { + environment.executedOps.add(fateId); + LOG.debug("Thread " + Thread.currentThread() + " in SleepingTestRepo.call() sleeping for " + + environment.sleepTimeMs + " millis"); + Thread.sleep(environment.sleepTimeMs); // Simulate some work + LOG.debug("Thread " + Thread.currentThread() + " finished SleepingTestRepo.call()"); + return null; + } + + @Override + public void undo(FateId fateId, SleepingTestEnv environment) { + + } + + @Override + public String getReturn() { + return null; + } + } + + public static class SleepingTestEnv extends MultipleStoresTestEnv { + public final Set<FateId> executedOps = Collections.synchronizedSet(new HashSet<>()); + public final int sleepTimeMs; + + public SleepingTestEnv(int sleepTimeMs) { + this.sleepTimeMs = sleepTimeMs; + } + } + + public static class LatchTestRepo implements Repo<LatchTestEnv> { + private static final long serialVersionUID = 1L; + + @Override + public long isReady(FateId fateId, LatchTestEnv environment) { + return 0; + } + + @Override + public String getName() { + return null; + } + + @Override + public Repo<LatchTestEnv> call(FateId fateId, LatchTestEnv environment) throws Exception { + LOG.debug("Thread " + Thread.currentThread() + " in LatchTestRepo.call()"); + environment.numWorkers.incrementAndGet(); + environment.workersLatch.await(); + LOG.debug("Thread " + Thread.currentThread() + " finished LatchTestRepo.call()"); + environment.numWorkers.decrementAndGet(); + return null; + } + + @Override + public void undo(FateId fateId, LatchTestEnv environment) { + + } + + @Override + public String getReturn() { + return null; + } + } + + public static class LatchTestEnv extends MultipleStoresTestEnv { + public final AtomicInteger numWorkers = new AtomicInteger(0); + public final CountDownLatch workersLatch = new CountDownLatch(1); + } + + protected abstract void executeSleepingEnvTest( + MultipleStoresTestExecutor<SleepingTestEnv> testMethod) throws Exception; + + protected abstract void executeLatchEnvTest(MultipleStoresTestExecutor<LatchTestEnv> testMethod) + throws Exception; + + protected interface TestStoreFactory<T extends MultipleStoresTestEnv> { + FateStore<T> create(ZooUtil.LockID lockID, Predicate<ZooUtil.LockID> isLockHeld) + throws InterruptedException, KeeperException; + } + + @FunctionalInterface + protected interface MultipleStoresTestExecutor<T extends MultipleStoresTestEnv> { + void execute(TestStoreFactory<T> fateStoreFactory) throws Exception; + } + + protected static class MultipleStoresTestEnv extends FateTestRunner.TestEnv {} +}