This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new c4a6ee7051 improves fate execution of in progress transactions (#4589) c4a6ee7051 is described below commit c4a6ee7051f5b1559e71e3abfab524717fa18494 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri May 24 12:50:33 2024 -0400 improves fate execution of in progress transactions (#4589) There are a few improvements to FATE in this PR. The first is that Fate now continues to execute a transaction as long as its ready. Before it would do one step of a transaction and then persist the result and stop, meaning the next stop would not run until the next full scan. The second change was to make finding runnable transactions look for in progress transaction first in the persisted store followed by the non inprogress ones. This change causes the search for runnable transactions to scan the store twice. To make this more efficient a locality group and filtering iterator were added. This should make the scans looking for fate transactions with a certain status much more efficient. These two improvements should make fate a bit more responsive for getting existing work done. Co-authored-by: Dave Marion <dlmar...@apache.org> --- .../accumulo/core/fate/AbstractFateStore.java | 19 +- .../java/org/apache/accumulo/core/fate/Fate.java | 96 ++--- .../org/apache/accumulo/core/fate/FateCleaner.java | 2 +- .../apache/accumulo/core/fate/MetaFateStore.java | 14 +- .../accumulo/core/fate/ReadOnlyFateStore.java | 15 +- .../accumulo/core/fate/user/FateStatusFilter.java | 71 ++++ .../accumulo/core/fate/user/UserFateStore.java | 4 +- .../apache/accumulo/core/logging/FateLogger.java | 6 + .../org/apache/accumulo/core/fate/TestStore.java | 5 + .../accumulo/server/init/InitialConfiguration.java | 6 + .../accumulo/manager/tableOps/split/PreSplit.java | 5 - .../accumulo/test/fate/FateInterleavingIT.java | 396 +++++++++++++++++++++ .../org/apache/accumulo/test/fate/FateStoreIT.java | 39 ++ .../test/fate/meta/MetaFateInterleavingIT.java | 44 +++ .../test/fate/user/UserFateInterleavingIT.java | 42 +++ 15 files changed, 709 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 0bec78d196..5c7127c3e7 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -149,6 +149,10 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { } } + private static final Set<TStatus> IN_PROGRESS_SET = Set.of(TStatus.IN_PROGRESS); + private static final Set<TStatus> OTHER_RUNNABLE_SET = + Set.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS); + @Override public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) { @@ -158,7 +162,11 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { final long beforeCount = unreservedRunnableCount.getCount(); final boolean beforeDeferredOverflow = deferredOverflow.get(); - try (Stream<FateIdStatus> transactions = getTransactions()) { + try (Stream<FateIdStatus> inProgress = getTransactions(IN_PROGRESS_SET); + Stream<FateIdStatus> other = getTransactions(OTHER_RUNNABLE_SET)) { + // read the in progress transaction first and then everything else in order to process those + // first + var transactions = Stream.concat(inProgress, other); transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus())) .map(FateIdStatus::getFateId).filter(fateId -> { synchronized (AbstractFateStore.this) { @@ -213,7 +221,12 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { @Override public Stream<FateIdStatus> list() { - return getTransactions(); + return getTransactions(TStatus.ALL_STATUSES); + } + + @Override + public Stream<FateIdStatus> list(Set<TStatus> statuses) { + return getTransactions(statuses); } @Override @@ -343,7 +356,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { protected abstract Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId); - protected abstract Stream<FateIdStatus> getTransactions(); + protected abstract Stream<FateIdStatus> getTransactions(Set<TStatus> statuses); protected abstract TStatus _getStatus(FateId fateId); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 80f543c0ba..e66287d704 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -145,8 +145,8 @@ public class Fate<T> { @Override public void run() { while (keepRunning.get()) { - long deferTime = 0; FateTxStore<T> txStore = null; + ExecutionState state = new ExecutionState(); try { var optionalopStore = reserveFateTx(); if (optionalopStore.isPresent()) { @@ -154,67 +154,81 @@ public class Fate<T> { } else { continue; } - TStatus status = txStore.getStatus(); - Repo<T> op = txStore.top(); - if (status == FAILED_IN_PROGRESS) { - processFailed(txStore, op); - } else if (status == SUBMITTED || status == IN_PROGRESS) { - Repo<T> prevOp = null; + state.status = txStore.getStatus(); + state.op = txStore.top(); + if (state.status == FAILED_IN_PROGRESS) { + processFailed(txStore, state.op); + } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) { try { - var startTime = NanoTime.now(); - deferTime = op.isReady(txStore.getID(), environment); - log.trace("Running {}.isReady() {} took {} ms and returned {}", op.getName(), - txStore.getID(), startTime.elapsed().toMillis(), deferTime); - - // Here, deferTime is only used to determine success (zero) or failure (non-zero), - // proceeding on success and returning to the while loop on failure. - // The value of deferTime is only used as a wait time in FateStore.unreserve - if (deferTime == 0) { - prevOp = op; - if (status == SUBMITTED) { - txStore.setStatus(IN_PROGRESS); - } - - startTime = NanoTime.now(); - op = op.call(txStore.getID(), environment); - log.trace("Running {}.call() {} took {} ms and returned {}", prevOp.getName(), - txStore.getID(), startTime.elapsed().toMillis(), - op == null ? "null" : op.getName()); - } else { + execute(txStore, state); + if (state.op != null && state.deferTime != 0) { + // The current op is not ready to execute continue; } - + } catch (StackOverflowException e) { + // the op that failed to push onto the stack was never executed, so no need to undo + // it just transition to failed and undo the ops that executed + transitionToFailed(txStore, e); + continue; } catch (Exception e) { blockIfHadoopShutdown(txStore.getID(), e); transitionToFailed(txStore, e); continue; } - if (op == null) { + if (state.op == null) { // transaction is finished - String ret = prevOp.getReturn(); + String ret = state.prevOp.getReturn(); if (ret != null) { txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret); } txStore.setStatus(SUCCESSFUL); doCleanUp(txStore); - } else { - try { - txStore.push(op); - } catch (StackOverflowException e) { - // the op that failed to push onto the stack was never executed, so no need to undo - // it - // just transition to failed and undo the ops that executed - transitionToFailed(txStore, e); - continue; - } } } } catch (Exception e) { runnerLog.error("Uncaught exception in FATE runner thread.", e); } finally { if (txStore != null) { - txStore.unreserve(deferTime, TimeUnit.MILLISECONDS); + txStore.unreserve(state.deferTime, TimeUnit.MILLISECONDS); + } + } + } + } + + private class ExecutionState { + Repo<T> prevOp = null; + Repo<T> op = null; + long deferTime = 0; + TStatus status; + } + + // Executes as many steps of a fate operation as possible + private void execute(final FateTxStore<T> txStore, final ExecutionState state) + throws Exception { + while (state.op != null && state.deferTime == 0) { + var startTime = NanoTime.now(); + state.deferTime = state.op.isReady(txStore.getID(), environment); + log.debug("Running {}.isReady() {} took {} ms and returned {}", state.op.getName(), + txStore.getID(), startTime.elapsed().toMillis(), state.deferTime); + + if (state.deferTime == 0) { + if (state.status == SUBMITTED) { + txStore.setStatus(IN_PROGRESS); + state.status = IN_PROGRESS; + } + + state.prevOp = state.op; + startTime = NanoTime.now(); + state.op = state.op.call(txStore.getID(), environment); + log.debug("Running {}.call() {} took {} ms and returned {}", state.prevOp.getName(), + txStore.getID(), startTime.elapsed().toMillis(), + state.op == null ? "null" : state.op.getName()); + + if (state.op != null) { + // persist the completion of this step before starting to run the next so in the case of + // process death the completed steps are not rerun + txStore.push(state.op); } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java index 4e1beb1b9b..4c9e7c0748 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java @@ -104,7 +104,7 @@ public class FateCleaner<T> { } public void ageOff() { - store.list().filter(ids -> AGE_OFF_STATUSES.contains(ids.getStatus())) + store.list(AGE_OFF_STATUSES) .forEach(idStatus -> store.tryReserve(idStatus.getFateId()).ifPresent(txStore -> { try { AgeOffInfo ageOffInfo = readAgeOffInfo(txStore); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java index d8cecd6a47..f701f34dcc 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java @@ -20,6 +20,7 @@ package org.apache.accumulo.core.fate; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.ALL_STATUSES; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -31,6 +32,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.function.Supplier; import java.util.stream.Stream; @@ -356,9 +358,9 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { } @Override - protected Stream<FateIdStatus> getTransactions() { + protected Stream<FateIdStatus> getTransactions(Set<TStatus> statuses) { try { - return zk.getChildren(path).stream().map(strTxid -> { + Stream<FateIdStatus> stream = zk.getChildren(path).stream().map(strTxid -> { String txUUIDStr = strTxid.split("_")[1]; FateId fateId = FateId.from(fateInstanceType, txUUIDStr); // Memoizing for two reasons. First the status may never be requested, so in that case avoid @@ -371,6 +373,12 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { } }; }); + + if (!ALL_STATUSES.equals(statuses)) { + stream = stream.filter(s -> statuses.contains(s.getStatus())); + } + + return stream; } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -378,7 +386,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public Stream<FateKey> list(FateKey.FateKeyType type) { - return getTransactions().flatMap(fis -> getKey(fis.getFateId()).stream()) + return getTransactions(ALL_STATUSES).flatMap(fis -> getKey(fis.getFateId()).stream()) .filter(fateKey -> fateKey.getType() == type); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index b2aa4999b2..1fd9cac06b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -19,11 +19,14 @@ package org.apache.accumulo.core.fate; import java.io.Serializable; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.accumulo.core.util.Pair; @@ -53,7 +56,10 @@ public interface ReadOnlyFateStore<T> { /** Unrecognized or unknown transaction state */ UNKNOWN, /** Transaction that is eligible to be executed */ - SUBMITTED + SUBMITTED; + + public static final Set<TStatus> ALL_STATUSES = + Arrays.stream(values()).collect(Collectors.toUnmodifiableSet()); } /** @@ -138,6 +144,13 @@ public interface ReadOnlyFateStore<T> { */ Stream<FateIdStatus> list(); + /** + * list all transaction ids in store that have a current status that is in the provided set + * + * @return all outstanding transactions, including those reserved by others. + */ + Stream<FateIdStatus> list(Set<TStatus> statuses); + /** * list transaction in the store that have a given fate key type. */ diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateStatusFilter.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateStatusFilter.java new file mode 100644 index 0000000000..e586a646c3 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateStatusFilter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.user; + +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.ALL_STATUSES; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +public class FateStatusFilter extends Filter { + + private EnumSet<ReadOnlyFateStore.TStatus> valuesToAccept; + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); + valuesToAccept = EnumSet.noneOf(ReadOnlyFateStore.TStatus.class); + var option = options.get("statuses"); + if (!option.isBlank()) { + for (var status : option.split(",")) { + valuesToAccept.add(ReadOnlyFateStore.TStatus.valueOf(status)); + } + } + } + + @Override + public boolean accept(Key k, Value v) { + var tstatus = ReadOnlyFateStore.TStatus.valueOf(v.toString()); + return valuesToAccept.contains(tstatus); + } + + public static void configureScanner(ScannerBase scanner, + Set<ReadOnlyFateStore.TStatus> statuses) { + // only filter when getting a subset of statuses + if (!statuses.equals(ALL_STATUSES)) { + String statusesStr = statuses.stream().map(Enum::name).collect(Collectors.joining(",")); + var iterSettings = new IteratorSetting(100, "statuses", FateStatusFilter.class); + iterSettings.addOption("statuses", statusesStr); + scanner.addScanIterator(iterSettings); + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 0ae7ce892e..75fdeca989 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; @@ -151,10 +152,11 @@ public class UserFateStore<T> extends AbstractFateStore<T> { } @Override - protected Stream<FateIdStatus> getTransactions() { + protected Stream<FateIdStatus> getTransactions(Set<TStatus> statuses) { try { Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); scanner.setRange(new Range()); + FateStatusFilter.configureScanner(scanner, statuses); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { String txUUIDStr = e.getKey().getRow().toString(); diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index a3c1b9cfd9..f1b86e32e8 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -20,6 +20,7 @@ package org.apache.accumulo.core.logging; import java.io.Serializable; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -120,6 +121,11 @@ public class FateLogger { return store.list(); } + @Override + public Stream<FateIdStatus> list(Set<TStatus> statuses) { + return store.list(statuses); + } + @Override public Stream<FateKey> list(FateKey.FateKeyType type) { return store.list(type); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index db2d7da770..1098a44c26 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -216,6 +216,11 @@ public class TestStore implements FateStore<String> { }); } + @Override + public Stream<FateIdStatus> list(Set<TStatus> statuses) { + return list().filter(fis -> statuses.contains(fis.getStatus())); + } + @Override public Stream<FateKey> list(FateKey.FateKeyType type) { throw new UnsupportedOperationException(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java index 81f2876db5..525099409e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java @@ -25,6 +25,7 @@ import java.util.function.Predicate; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.fate.user.schema.FateSchema; import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; @@ -85,6 +86,11 @@ class InitialConfiguration { initialFateTableConf.putAll(commonConfig); initialFateTableConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "256M"); + // Create a locality group that contains status so its fast to scan. When fate looks for work is + // scans this family. + initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "status", + FateSchema.TxColumnFamily.STR_NAME); + initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "status"); int max = hadoopConf.getInt("dfs.replication.max", 512); // Hadoop 0.23 switched the min value configuration name diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index 2bf5007d8c..906e953f45 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -69,11 +69,6 @@ public class PreSplit extends ManagerRepo { public long isReady(FateId fateId, Manager manager) throws Exception { var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); - // ELASTICITY_TODO does FATE prioritize running Fate txs that have already started? If not would - // be good to look into this so we can finish things that are started before running new txs - // that have not completed their first step. Once splits starts running, would like it to move - // through as quickly as possible. - var tabletMetadata = manager.getContext().getAmple().readTablet(splitInfo.getOriginal(), PREV_ROW, LOCATION, OPID, LOGS); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java new file mode 100644 index 0000000000..170d64d431 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.AbstractMap; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Iterators; + +public abstract class FateInterleavingIT extends SharedMiniClusterBase + implements FateTestRunner<FateInterleavingIT.FilTestEnv> { + + public static class FilTestEnv extends TestEnv { + private final AccumuloClient client; + + public FilTestEnv(AccumuloClient client) { + this.client = client; + } + + AccumuloClient getClient() { + return client; + } + } + + public static class FirstOp implements Repo<FateInterleavingIT.FilTestEnv> { + + private static final long serialVersionUID = 1L; + + protected boolean isTrackingDataSet(FateId tid, FilTestEnv env, String step) throws Exception { + try (Scanner scanner = env.getClient().createScanner(FATE_TRACKING_TABLE)) { + return scanner.stream() + .anyMatch(e -> e.getKey().getColumnFamily().toString().equals(tid.canonical()) + && e.getValue().toString().equals(step)); + } + } + + protected static void insertTrackingData(FateId tid, FilTestEnv env, String step) + throws TableNotFoundException, MutationsRejectedException { + try (BatchWriter bw = env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) { + Mutation mut = new Mutation(Long.toString(System.currentTimeMillis())); + mut.put(tid.canonical(), "", step); + bw.addMutation(mut); + } + } + + @Override + public long isReady(FateId tid, FilTestEnv env) throws Exception { + Thread.sleep(50); + var step = this.getName() + "::isReady"; + if (isTrackingDataSet(tid, env, step)) { + return 0; + } else { + insertTrackingData(tid, env, step); + return 100; + } + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public Repo<FilTestEnv> call(FateId tid, FilTestEnv env) throws Exception { + Thread.sleep(50); + insertTrackingData(tid, env, this.getName() + "::call"); + return new SecondOp(); + } + + @Override + public void undo(FateId fateId, FilTestEnv environment) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public String getReturn() { + return ""; + } + } + + public static class SecondOp extends FirstOp { + private static final long serialVersionUID = 1L; + + @Override + public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception { + super.call(tid, environment); + return new LastOp(); + } + + } + + public static class LastOp extends FirstOp { + private static final long serialVersionUID = 1L; + + @Override + public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception { + super.call(tid, environment); + return null; + } + } + + private static final String FATE_TRACKING_TABLE = "fate_tracking"; + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.withInitialTabletAvailability(TabletAvailability.HOSTED); + client.tableOperations().create(FATE_TRACKING_TABLE, ntc); + } + } + + @AfterAll + public static void teardown() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + @BeforeEach + public void before() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().deleteRows(FATE_TRACKING_TABLE, null, null); + } + } + + private void waitFor(FateStore<FilTestEnv> store, FateId txid) throws Exception { + while (store.read(txid).getStatus() != SUCCESSFUL) { + Thread.sleep(50); + } + } + + protected Fate<FilTestEnv> initializeFate(AccumuloClient client, FateStore<FilTestEnv> store) { + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); + return new Fate<>(new FilTestEnv(client), store, r -> r + "", config); + } + + private static Entry<String,String> toIdStep(Entry<Key,Value> e) { + return new AbstractMap.SimpleImmutableEntry<>(e.getKey().getColumnFamily().toString(), + e.getValue().toString()); + } + + @Test + public void testInterleaving() throws Exception { + executeTest(this::testInterleaving); + } + + protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx) + throws Exception { + + // This test verifies that fates will interleave in time when their isReady() returns >0 and + // then 0. + + FateId[] fateIds = new FateId[3]; + + for (int i = 0; i < 3; i++) { + fateIds[i] = store.create(); + var txStore = store.reserve(fateIds[i]); + try { + txStore.push(new FirstOp()); + txStore.setTransactionInfo(TxInfo.TX_NAME, "TEST_" + i); + txStore.setStatus(SUBMITTED); + } finally { + txStore.unreserve(0, TimeUnit.SECONDS); + } + } + + Fate<FilTestEnv> fate = null; + + // The execution order of the transactions is not according to their insertion + // order. However, we do know that the first step of each transaction will be + // executed before the second steps. + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + fate = initializeFate(client, store); + + for (var fateId : fateIds) { + waitFor(store, fateId); + } + + var expectedIds = + Set.of(fateIds[0].canonical(), fateIds[1].canonical(), fateIds[2].canonical()); + + Scanner scanner = client.createScanner(FATE_TRACKING_TABLE); + Iterator<Entry<String,String>> iter = scanner.stream().map(FateInterleavingIT::toIdStep) + .filter(e -> e.getValue().contains("::call")).iterator(); + + SortedMap<String,String> subset = new TreeMap<>(); + + Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), e.getValue())); + + // Should see the call() for the first steps of all three fates come first in time + assertTrue(subset.values().stream().allMatch(v -> v.startsWith("FirstOp"))); + assertEquals(expectedIds, subset.keySet()); + + subset.clear(); + + Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), e.getValue())); + + // Should see the call() for the second steps of all three fates come second in time + assertTrue(subset.values().stream().allMatch(v -> v.startsWith("SecondOp"))); + assertEquals(expectedIds, subset.keySet()); + + subset.clear(); + + Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), e.getValue())); + + // Should see the call() for the last steps of all three fates come last in time + assertTrue(subset.values().stream().allMatch(v -> v.startsWith("LastOp"))); + assertEquals(expectedIds, subset.keySet()); + + assertFalse(iter.hasNext()); + + } finally { + if (fate != null) { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + } + + public static class FirstNonInterleavingOp extends FirstOp { + + private static final long serialVersionUID = 1L; + + @Override + public long isReady(FateId tid, FilTestEnv env) throws Exception { + Thread.sleep(50); + insertTrackingData(tid, env, this.getName() + "::isReady"); + return 0; + } + + @Override + public Repo<FilTestEnv> call(FateId tid, FilTestEnv manager) throws Exception { + Thread.sleep(50); + insertTrackingData(tid, manager, this.getName() + "::call"); + return new SecondNonInterleavingOp(); + } + } + + public static class SecondNonInterleavingOp extends FirstNonInterleavingOp { + + private static final long serialVersionUID = 1L; + + @Override + public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception { + super.call(tid, environment); + return new LastNonInterleavingOp(); + } + + } + + public static class LastNonInterleavingOp extends FirstNonInterleavingOp { + + private static final long serialVersionUID = 1L; + + @Override + public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception { + super.call(tid, environment); + return null; + } + + } + + @Test + public void testNonInterleaving() throws Exception { + executeTest(this::testNonInterleaving); + } + + protected void testNonInterleaving(FateStore<FilTestEnv> store, ServerContext sctx) + throws Exception { + + // This test ensures that when isReady() always returns zero that all the fate steps will + // execute immediately + + FateId[] fateIds = new FateId[3]; + + for (int i = 0; i < 3; i++) { + fateIds[i] = store.create(); + var txStore = store.reserve(fateIds[i]); + try { + txStore.push(new FirstNonInterleavingOp()); + txStore.setTransactionInfo(TxInfo.TX_NAME, "TEST_" + i); + txStore.setStatus(SUBMITTED); + } finally { + txStore.unreserve(0, TimeUnit.SECONDS); + } + } + + Fate<FilTestEnv> fate = null; + + // The execution order of the transactions is not according to their insertion + // order. In this case, without interleaving, a transaction will run start to finish + // before moving on to the next transaction + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + fate = initializeFate(client, store); + + for (var fateId : fateIds) { + waitFor(store, fateId); + } + + Scanner scanner = client.createScanner(FATE_TRACKING_TABLE); + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + SortedMap<Key,Value> subset = new TreeMap<>(); + + // should see one fate op execute all of it steps + var seenId1 = verifySameIds(iter, subset); + // should see another fate op execute all of it steps + var seenId2 = verifySameIds(iter, subset); + // should see another fate op execute all of it steps + var seenId3 = verifySameIds(iter, subset); + + assertEquals(Set.of(fateIds[0], fateIds[1], fateIds[2]), Set.of(seenId1, seenId2, seenId3)); + + assertFalse(iter.hasNext()); + + } finally { + if (fate != null) { + fate.shutdown(10, TimeUnit.MINUTES); + } + } + } + + private FateId verifySameIds(Iterator<Entry<Key,Value>> iter, SortedMap<Key,Value> subset) { + subset.clear(); + Iterators.limit(iter, 6).forEachRemaining(e -> subset.put(e.getKey(), e.getValue())); + + Text fateId = subset.keySet().iterator().next().getColumnFamily(); + assertTrue(subset.keySet().stream().allMatch(k -> k.getColumnFamily().equals(fateId))); + + var expectedVals = Set.of("FirstNonInterleavingOp::isReady", "FirstNonInterleavingOp::call", + "SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call", + "LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call"); + var actualVals = subset.values().stream().map(Value::toString).collect(Collectors.toSet()); + assertEquals(expectedVals, actualVals); + + return FateId.from(fateId.toString()); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java index 8a705735d3..f28299b02a 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java @@ -65,6 +65,7 @@ import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; import com.google.common.base.Throwables; +import com.google.common.collect.Sets; public abstract class FateStoreIT extends SharedMiniClusterBase implements FateTestRunner<TestEnv> { @@ -253,6 +254,44 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT } } + @Test + public void testListStatus() throws Exception { + executeTest(this::testListStatus); + } + + protected void testListStatus(FateStore<TestEnv> store, ServerContext sctx) throws Exception { + try { + Map<FateId,TStatus> expectedStatus = new HashMap<>(); + + for (int i = 0; i < 5; i++) { + for (var status : TStatus.values()) { + var fateId = store.create(); + var txStore = store.reserve(fateId); + txStore.setStatus(status); + txStore.unreserve(0, TimeUnit.SECONDS); + expectedStatus.put(fateId, status); + } + } + + for (var statuses : Sets.powerSet(Set.of(TStatus.values()))) { + var expected = + expectedStatus.entrySet().stream().filter(e -> statuses.contains(e.getValue())) + .map(Map.Entry::getKey).collect(Collectors.toSet()); + var actual = store.list(statuses).map(FateIdStatus::getFateId).collect(Collectors.toSet()); + assertEquals(expected, actual); + } + } finally { + // Cleanup so we don't interfere with other tests + // All stores should already be unreserved + store.list().forEach(fateIdStatus -> { + var txStore = store.tryReserve(fateIdStatus.getFateId()).orElseThrow(); + txStore.setStatus(TStatus.SUCCESSFUL); + txStore.delete(); + txStore.unreserve(0, TimeUnit.SECONDS); + }); + } + } + @Test public void testCreateWithKey() throws Exception { executeTest(this::testCreateWithKey); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java new file mode 100644 index 0000000000..0ba14f730e --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.meta; + +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateInterleavingIT; + +public class MetaFateInterleavingIT extends FateInterleavingIT { + + // put the fate data for the test in a different location than what accumulo is using + private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID(); + + @Override + public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int maxDeferred, + AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { + ServerContext sctx = getCluster().getServerContext(); + String path = ZK_ROOT + Constants.ZFATE; + ZooReaderWriter zk = sctx.getZooReaderWriter(); + zk.mkdirs(ZK_ROOT); + testMethod.execute(new MetaFateStore<>(path, zk), sctx); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java new file mode 100644 index 0000000000..afd4d8ac5a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.user; + +import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.test.fate.FateInterleavingIT; + +public class UserFateInterleavingIT extends FateInterleavingIT { + @Override + public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int maxDeferred, + AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { + var table = getUniqueNames(1)[0]; + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + createFateTable(client, table); + testMethod.execute(new UserFateStore<>(client, table, maxDeferred, fateIdGenerator), + getCluster().getServerContext()); + client.tableOperations().delete(table); + } + } +}