This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 9550da7c838994f9f6e9547819566b01e4b1dc6e Merge: ae49a7ff02 9839e4d42f Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Mar 14 10:29:24 2024 -0400 Merge branch 'main' into elasticity .../accumulo/core/fate/AbstractFateStore.java | 12 ++++++----- .../org/apache/accumulo/core/fate/AdminUtil.java | 6 +++--- .../java/org/apache/accumulo/core/fate/Fate.java | 15 +++++++------- .../org/apache/accumulo/core/fate/FateCleaner.java | 3 ++- .../org/apache/accumulo/core/fate/FateStore.java | 6 +++--- .../accumulo/core/fate/WrappedFateTxStore.java | 6 +++--- .../apache/accumulo/core/fate/FateCleanerTest.java | 21 ++++++++++---------- .../org/apache/accumulo/core/fate/TestStore.java | 4 ++-- .../test/compaction/ExternalCompaction_1_IT.java | 6 +++--- .../accumulo/test/fate/accumulo/FateStoreIT.java | 23 +++++++++++----------- 10 files changed, 53 insertions(+), 49 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 0cad25f857,0000000000..d805b230b2 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@@ -1,494 -1,0 +1,496 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; ++import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.time.NanoTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public abstract class AbstractFateStore<T> implements FateStore<T> { + + private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class); + + // Default maximum size of 100,000 transactions before deferral is stopped and + // all existing transactions are processed immediately again + public static final int DEFAULT_MAX_DEFERRED = 100_000; + + public static final FateIdGenerator DEFAULT_FATE_ID_GENERATOR = new FateIdGenerator() { + @Override + public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { + HashCode hashCode = Hashing.murmur3_128().hashBytes(fateKey.getSerialized()); + long tid = hashCode.asLong() & 0x7fffffffffffffffL; + return FateId.from(instanceType, tid); + } + }; + + protected final Set<FateId> reserved; + protected final Map<FateId,NanoTime> deferred; + private final int maxDeferred; + private final AtomicBoolean deferredOverflow = new AtomicBoolean(); + private final FateIdGenerator fateIdGenerator; + + // This is incremented each time a transaction was unreserved that was non new + protected final SignalCount unreservedNonNewCount = new SignalCount(); + + // This is incremented each time a transaction is unreserved that was runnable + protected final SignalCount unreservedRunnableCount = new SignalCount(); + + public AbstractFateStore() { + this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + } + + public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) { + this.maxDeferred = maxDeferred; + this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator); + this.reserved = new HashSet<>(); + this.deferred = new HashMap<>(); + } + + public static byte[] serialize(Object o) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(o); + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION", + justification = "unsafe to store arbitrary serialized objects like this, but needed for now" + + " for backwards compatibility") + public static Object deserialize(byte[] ser) { + try (ByteArrayInputStream bais = new ByteArrayInputStream(ser); + ObjectInputStream ois = new ObjectInputStream(bais)) { + return ois.readObject(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException(e); + } + } + + /** + * Attempt to reserve the fate transaction. + * + * @param fateId The FateId + * @return An Optional containing the FateTxStore if the transaction was successfully reserved, or + * an empty Optional if the transaction was already reserved. + */ + @Override + public Optional<FateTxStore<T>> tryReserve(FateId fateId) { + synchronized (this) { + if (!reserved.contains(fateId)) { + return Optional.of(reserve(fateId)); + } + return Optional.empty(); + } + } + + @Override + public FateTxStore<T> reserve(FateId fateId) { + synchronized (AbstractFateStore.this) { + while (reserved.contains(fateId)) { + try { + AbstractFateStore.this.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + reserved.add(fateId); + return newFateTxStore(fateId, true); + } + } + + @Override + public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) { + + AtomicLong seen = new AtomicLong(0); + + while (keepWaiting.get() && seen.get() == 0) { + final long beforeCount = unreservedRunnableCount.getCount(); + final boolean beforeDeferredOverflow = deferredOverflow.get(); + + try (Stream<FateIdStatus> transactions = getTransactions()) { + transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus())) + .map(FateIdStatus::getFateId).filter(fateId -> { + synchronized (AbstractFateStore.this) { + var deferredTime = deferred.get(fateId); + if (deferredTime != null) { + if (deferredTime.elapsed().isNegative()) { - // negative elapsed time indicates the deferal time is in the future ++ // negative elapsed time indicates the deferral time is in the future + return false; + } else { + deferred.remove(fateId); + } + } + return !reserved.contains(fateId); + } + }).forEach(fateId -> { + seen.incrementAndGet(); + idConsumer.accept(fateId); + }); + } + + // If deferredOverflow was previously marked true then the deferred map + // would have been cleared and seen.get() should be greater than 0 as there would + // be a lot of transactions to process in the previous run, so we won't be sleeping here + if (seen.get() == 0) { + if (beforeCount == unreservedRunnableCount.getCount()) { + long waitTime = 5000; + synchronized (AbstractFateStore.this) { + if (!deferred.isEmpty()) { + var now = NanoTime.now(); + waitTime = deferred.values().stream() + .mapToLong(nanoTime -> nanoTime.subtract(now).toMillis()).min().getAsLong(); + } + } + + if (waitTime > 0) { + unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, + keepWaiting::get); + } + } + } + + // Reset if the current state only if it matches the state before the execution. + // This is to avoid a race condition where the flag was set during the run. + // We should ensure at least one of the FATE executors will run through the + // entire transaction list first before clearing the flag and allowing more + // deferred entries into the map again. In other words, if the before state + // was false and during the execution at some point it was marked true this would + // not reset until after the next run + deferredOverflow.compareAndSet(beforeDeferredOverflow, false); + } + } + + @Override + public Stream<FateIdStatus> list() { + return getTransactions(); + } + + @Override + public ReadOnlyFateTxStore<T> read(FateId fateId) { + return newFateTxStore(fateId, false); + } + + protected boolean isRunnable(TStatus status) { + return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS + || status == TStatus.SUBMITTED; + } + + public static abstract class FateIdStatusBase implements FateIdStatus { + private final FateId fateId; + + public FateIdStatusBase(FateId fateId) { + this.fateId = fateId; + } + + @Override + public FateId getFateId() { + return fateId; + } + } + + @Override + public boolean isDeferredOverflow() { + return deferredOverflow.get(); + } + + @Override + public int getDeferredCount() { + // This method is primarily used right now for unit testing but + // if this synchronization becomes an issue we could add an atomic + // counter instead to track it separately so we don't need to lock + synchronized (AbstractFateStore.this) { + return deferred.size(); + } + } + + private Optional<FateId> create(FateKey fateKey) { + FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey); + + try { + create(fateId, fateKey); + } catch (IllegalStateException e) { + Pair<TStatus,Optional<FateKey>> statusAndKey = getStatusAndKey(fateId); + TStatus status = statusAndKey.getFirst(); + Optional<FateKey> tFateKey = statusAndKey.getSecond(); + + // Case 1: Status is NEW so this is unseeded, we can return and allow the calling code + // to reserve/seed as long as the existing key is the same and not different as that would + // mean a collision + if (status == TStatus.NEW) { + Preconditions.checkState(tFateKey.isPresent(), "Tx Key is missing from tid %s", + fateId.getTid()); + Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()), + "Collision detected for tid %s", fateId.getTid()); + // Case 2: Status is some other state which means already in progress + // so we can just log and return empty optional + } else { + log.trace("Existing transaction {} already exists for key {} with status {}", fateId, + fateKey, status); + return Optional.empty(); + } + } + + return Optional.of(fateId); + } + + @Override + public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) { + FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey); + final Optional<FateTxStore<T>> txStore; + + // First make sure we can reserve in memory the fateId, if not + // we can return an empty Optional as it is reserved and in progress + // This reverses the usual order of creation and then reservation but + // this prevents a race condition by ensuring we can reserve first. + // This will create the FateTxStore before creation but this object + // is not exposed until after creation is finished so there should not + // be any errors. + final Optional<FateTxStore<T>> reservedTxStore; + synchronized (this) { + reservedTxStore = tryReserve(fateId); + } + + // If present we were able to reserve so try and create + if (reservedTxStore.isPresent()) { + try { + var fateIdFromCreate = create(fateKey); + if (fateIdFromCreate.isPresent()) { + Preconditions.checkState(fateId.equals(fateIdFromCreate.orElseThrow()), + "Transaction creation returned unexpected %s, expected %s", fateIdFromCreate, fateId); + txStore = reservedTxStore; + } else { + // We already exist in a non-new state then un-reserve and an empty + // Optional will be returned. This is expected to happen when the + // system is busy and operations are not running, and we keep seeding them + synchronized (this) { + reserved.remove(fateId); + } + txStore = Optional.empty(); + } + } catch (Exception e) { + // Clean up the reservation if the creation failed + // And then throw error + synchronized (this) { + reserved.remove(fateId); + } + if (e instanceof IllegalStateException) { + throw e; + } else { + throw new IllegalStateException(e); + } + } + } else { + // Could not reserve so return empty + log.trace("Another thread currently has transaction {} key {} reserved", fateId, fateKey); + txStore = Optional.empty(); + } + + return txStore; + } + + protected abstract void create(FateId fateId, FateKey fateKey); + + protected abstract Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId); + + protected abstract Stream<FateIdStatus> getTransactions(); + + protected abstract TStatus _getStatus(FateId fateId); + + protected abstract Optional<FateKey> getKey(FateId fateId); + + protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean isReserved); + + protected abstract FateInstanceType getInstanceType(); + + protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> { + protected final FateId fateId; + protected final boolean isReserved; + + protected TStatus observedStatus = null; + + protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) { + this.fateId = fateId; + this.isReserved = isReserved; + } + + @Override + public TStatus waitForStatusChange(EnumSet<TStatus> expected) { + Preconditions.checkState(!isReserved, + "Attempted to wait for status change while reserved " + fateId); + while (true) { + + long countBefore = unreservedNonNewCount.getCount(); + + TStatus status = _getStatus(fateId); + if (expected.contains(status)) { + return status; + } + + unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true); + } + } + + @Override - public void unreserve(Duration deferTime) { ++ public void unreserve(long deferTime, TimeUnit timeUnit) { ++ Duration deferDuration = Duration.of(deferTime, timeUnit.toChronoUnit()); + - if (deferTime.isNegative()) { ++ if (deferDuration.isNegative()) { + throw new IllegalArgumentException("deferTime < 0 : " + deferTime); + } + + synchronized (AbstractFateStore.this) { + if (!reserved.remove(fateId)) { + throw new IllegalStateException("Tried to unreserve id that was not reserved " + fateId); + } + + // notify any threads waiting to reserve + AbstractFateStore.this.notifyAll(); + + // If deferred map has overflowed then skip adding to the deferred map + // and clear the map and set the flag. This will cause the next execution + // of runnable to process all the transactions and to not defer as we + // have a large backlog and want to make progress - if (deferTime.compareTo(Duration.ZERO) > 0 && !deferredOverflow.get()) { ++ if (deferDuration.compareTo(Duration.ZERO) > 0 && !deferredOverflow.get()) { + if (deferred.size() >= maxDeferred) { + log.info( + "Deferred map overflowed with size {}, clearing and setting deferredOverflow to true", + deferred.size()); + deferredOverflow.set(true); + deferred.clear(); + } else { - deferred.put(fateId, NanoTime.nowPlus(deferTime)); ++ deferred.put(fateId, NanoTime.nowPlus(deferDuration)); + } + } + } + + if (observedStatus != null && isRunnable(observedStatus)) { + unreservedRunnableCount.increment(); + } + + if (observedStatus != TStatus.NEW) { + unreservedNonNewCount.increment(); + } + } + + protected void verifyReserved(boolean isWrite) { + if (!isReserved && isWrite) { + throw new IllegalStateException("Attempted write on unreserved FATE transaction."); + } + + if (isReserved) { + synchronized (AbstractFateStore.this) { + if (!reserved.contains(fateId)) { + throw new IllegalStateException("Tried to operate on unreserved transaction " + fateId); + } + } + } + } + + @Override + public TStatus getStatus() { + verifyReserved(false); + var status = _getStatus(fateId); + observedStatus = status; + return status; + } + + @Override + public Optional<FateKey> getKey() { + verifyReserved(false); + return AbstractFateStore.this.getKey(fateId); + } + + @Override + public Pair<TStatus,Optional<FateKey>> getStatusAndKey() { + verifyReserved(false); + return AbstractFateStore.this.getStatusAndKey(fateId); + } + + @Override + public FateId getID() { + return fateId; + } + } + + public interface FateIdGenerator { + FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey); + } + + protected byte[] serializeTxInfo(Serializable so) { + if (so instanceof String) { + return ("S " + so).getBytes(UTF_8); + } else { + byte[] sera = serialize(so); + byte[] data = new byte[sera.length + 2]; + System.arraycopy(sera, 0, data, 2, sera.length); + data[0] = 'O'; + data[1] = ' '; + return data; + } + } + + protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { + if (data[0] == 'O') { + byte[] sera = new byte[data.length - 2]; + System.arraycopy(data, 2, sera, 0, sera.length); + return (Serializable) deserialize(sera); + } else if (data[0] == 'S') { + return new String(data, 2, data.length - 2, UTF_8); + } else { + throw new IllegalStateException("Bad node data " + txInfo); + } + } +} diff --cc core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index f2e6da41e3,858e6e6998..c18defb1ac --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@@ -33,12 -32,9 +32,13 @@@ import java.util.List import java.util.Map; import java.util.Map.Entry; import java.util.Set; + import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath; import org.apache.accumulo.core.fate.zookeeper.ZooReader; @@@ -458,32 -432,26 +458,32 @@@ public class AdminUtil<T> return false; } boolean state = false; - zs.reserve(txid); - TStatus ts = zs.getStatus(txid); - switch (ts) { - case UNKNOWN: - System.out.printf("Invalid transaction ID: %016x%n", txid); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - case FAILED: - case FAILED_IN_PROGRESS: - case SUCCESSFUL: - System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts); - zs.delete(txid); - state = true; - break; - } - zs.unreserve(txid, 0, TimeUnit.MILLISECONDS); + // determine which store to use + FateStore<T> store = stores.get(fateId.getType()); + + FateTxStore<T> txStore = store.reserve(fateId); + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.println("Invalid transaction ID: " + fateId); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + case FAILED: + case FAILED_IN_PROGRESS: + case SUCCESSFUL: + System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts); + txStore.delete(); + state = true; + break; + } + } finally { - txStore.unreserve(Duration.ZERO); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); + } return state; } @@@ -501,40 -469,33 +501,40 @@@ return false; } boolean state = false; - zs.reserve(txid); - TStatus ts = zs.getStatus(txid); - switch (ts) { - case UNKNOWN: - System.out.printf("Invalid transaction ID: %016x%n", txid); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - System.out.printf("Failing transaction: %016x (%s)%n", txid, ts); - zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS); - state = true; - break; - - case SUCCESSFUL: - System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts); - break; - - case FAILED: - case FAILED_IN_PROGRESS: - System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts); - state = true; - break; + + // determine which store to use + FateStore<T> store = stores.get(fateId.getType()); + + FateTxStore<T> txStore = store.reserve(fateId); + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.println("Invalid fate ID: " + fateId); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + System.out.printf("Failing transaction: %s (%s)%n", fateId, ts); + txStore.setStatus(TStatus.FAILED_IN_PROGRESS); + state = true; + break; + + case SUCCESSFUL: + System.out.printf("Transaction already completed: %s (%s)%n", fateId, ts); + break; + + case FAILED: + case FAILED_IN_PROGRESS: + System.out.printf("Transaction already failed: %s (%s)%n", fateId, ts); + state = true; + break; + } + } finally { - txStore.unreserve(Duration.ZERO); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } - zs.unreserve(txid, 0, TimeUnit.MILLISECONDS); return state; } diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java index f4e9728df3,1a14418b1a..e5be68dbb2 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@@ -19,23 -19,19 +19,22 @@@ package org.apache.accumulo.core.fate; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.IN_PROGRESS; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.NEW; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUBMITTED; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUCCESSFUL; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.UNKNOWN; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; import static org.apache.accumulo.core.util.ShutdownUtil.isIOException; - import java.time.Duration; import java.util.EnumSet; +import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; @@@ -205,8 -132,8 +204,8 @@@ public class Fate<T> } catch (Exception e) { runnerLog.error("Uncaught exception in FATE runner thread.", e); } finally { - if (tid != null) { - store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS); + if (txStore != null) { - txStore.unreserve(Duration.ofMillis(deferTime)); ++ txStore.unreserve(deferTime, TimeUnit.MILLISECONDS); } } } @@@ -338,55 -263,33 +337,55 @@@ return store.create(); } + public Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo, + boolean autoCleanUp, String goalMessage) { + + Optional<FateTxStore<T>> optTxStore = store.createAndReserve(fateKey); + + return optTxStore.map(txStore -> { + var fateId = txStore.getID(); + try { + Preconditions.checkState(txStore.getStatus() == NEW); + seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore); + } finally { - txStore.unreserve(Duration.ZERO); ++ txStore.unreserve(0, MILLISECONDS); + } + return fateId; + }); + } + + private void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp, + String goalMessage, FateTxStore<T> txStore) { + if (txStore.top() == null) { + try { + log.info("Seeding {} {}", fateId, goalMessage); + txStore.push(repo); + } catch (StackOverflowException e) { + // this should not happen + throw new IllegalStateException(e); + } + } + + if (autoCleanUp) { + txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp); + } + + txStore.setTransactionInfo(TxInfo.TX_NAME, txName); + + txStore.setStatus(SUBMITTED); + } + // start work in the transaction.. it is safe to call this // multiple times for a transaction... but it will only seed once - public void seedTransaction(String txName, long tid, Repo<T> repo, boolean autoCleanUp, + public void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp, String goalMessage) { - store.reserve(tid); + FateTxStore<T> txStore = store.reserve(fateId); try { - if (store.getStatus(tid) == NEW) { - if (store.top(tid) == null) { - try { - log.info("Seeding {} {}", FateTxId.formatTid(tid), goalMessage); - store.push(tid, repo); - } catch (StackOverflowException e) { - // this should not happen - throw new IllegalStateException(e); - } - } - - if (autoCleanUp) { - store.setTransactionInfo(tid, TxInfo.AUTO_CLEAN, autoCleanUp); - } - - store.setTransactionInfo(tid, TxInfo.TX_NAME, txName); - - store.setStatus(tid, SUBMITTED); + if (txStore.getStatus() == NEW) { + seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore); } } finally { - txStore.unreserve(Duration.ZERO); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } @@@ -423,7 -325,7 +422,7 @@@ return false; } } finally { - txStore.unreserve(Duration.ZERO); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } else { // reserved, lets retry. @@@ -453,34 -356,34 +452,34 @@@ break; } } finally { - txStore.unreserve(Duration.ZERO); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } - public String getReturn(long tid) { - store.reserve(tid); + public String getReturn(FateId fateId) { + FateTxStore<T> txStore = store.reserve(fateId); try { - if (store.getStatus(tid) != SUCCESSFUL) { - throw new IllegalStateException("Tried to get exception when transaction " - + FateTxId.formatTid(tid) + " not in successful state"); + if (txStore.getStatus() != SUCCESSFUL) { + throw new IllegalStateException( + "Tried to get exception when transaction " + fateId + " not in successful state"); } - return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE); + return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE); } finally { - txStore.unreserve(Duration.ZERO); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } // get reportable failures - public Exception getException(long tid) { - store.reserve(tid); + public Exception getException(FateId fateId) { + FateTxStore<T> txStore = store.reserve(fateId); try { - if (store.getStatus(tid) != FAILED) { - throw new IllegalStateException("Tried to get exception when transaction " - + FateTxId.formatTid(tid) + " not in failed state"); + if (txStore.getStatus() != FAILED) { + throw new IllegalStateException( + "Tried to get exception when transaction " + fateId + " not in failed state"); } - return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION); + return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION); } finally { - txStore.unreserve(Duration.ZERO); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } diff --cc core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java index 7dd0339f24,0000000000..4e1beb1b9b mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java @@@ -1,134 -1,0 +1,135 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.time.Duration; +import java.util.EnumSet; +import java.util.UUID; ++import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Removes Repos, in the Fate store it tracks, that are in a finished or new state for more than a + * configurable time period. This class stores data in the Fate store under the + * {@link org.apache.accumulo.core.fate.Fate.TxInfo#TX_AGEOFF} field. The data stored under this + * field is used to track fate transactions that are candidates for cleanup. + * + * <p> + * No external time source is used. It starts tracking idle time when its created. + * + * <p> + * The {@link #ageOff()} method on this class must be periodically called inorder to cleanup to + * happen. + */ +public class FateCleaner<T> { + + public interface TimeSource { + long currentTimeNanos(); + } + + // Statuses that can be aged off if idle for a prolonged period. + private static final EnumSet<TStatus> AGE_OFF_STATUSES = + EnumSet.of(TStatus.NEW, TStatus.FAILED, TStatus.SUCCESSFUL); + + // This is used to determine if age off data was persisted by another instance of this object. + private final UUID instanceId = UUID.randomUUID(); + + private static final Logger log = LoggerFactory.getLogger(FateCleaner.class); + + private final FateStore<T> store; + + private final long ageOffTime; + private final TimeSource timeSource; + + private static class AgeOffInfo { + final UUID instanceId; + final long setTime; + final TStatus status; + + public AgeOffInfo(String ageOffStr) { + var tokens = ageOffStr.split(":"); + Preconditions.checkArgument(tokens.length == 3, "Malformed input %s", ageOffStr); + instanceId = UUID.fromString(tokens[0]); + setTime = Long.parseLong(tokens[1]); + status = TStatus.valueOf(tokens[2]); + } + + public AgeOffInfo(UUID instanceId, long time, TStatus status) { + this.instanceId = instanceId; + this.setTime = time; + this.status = status; + } + + @Override + public String toString() { + return instanceId + ":" + setTime + ":" + status; + } + } + + private AgeOffInfo readAgeOffInfo(FateTxStore<T> txStore) { + String ageOffStr = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_AGEOFF); + if (ageOffStr == null) { + return null; + } + + return new AgeOffInfo(ageOffStr); + } + + private boolean shouldAgeOff(TStatus currStatus, AgeOffInfo ageOffInfo) { + return AGE_OFF_STATUSES.contains(currStatus) && currStatus == ageOffInfo.status + && ageOffInfo.instanceId.equals(instanceId) + && timeSource.currentTimeNanos() - ageOffInfo.setTime >= ageOffTime; + } + + public void ageOff() { + store.list().filter(ids -> AGE_OFF_STATUSES.contains(ids.getStatus())) + .forEach(idStatus -> store.tryReserve(idStatus.getFateId()).ifPresent(txStore -> { + try { + AgeOffInfo ageOffInfo = readAgeOffInfo(txStore); + TStatus currStatus = txStore.getStatus(); + if (ageOffInfo == null || !ageOffInfo.instanceId.equals(instanceId) + || currStatus != ageOffInfo.status) { + // set or reset the age off info because it does not exists or it exists but is no + // longer valid + var newAgeOffInfo = + new AgeOffInfo(instanceId, timeSource.currentTimeNanos(), currStatus); + txStore.setTransactionInfo(Fate.TxInfo.TX_AGEOFF, newAgeOffInfo.toString()); + log.trace("Set age off data {} {}", idStatus.getFateId(), newAgeOffInfo); + } else if (shouldAgeOff(currStatus, ageOffInfo)) { + txStore.delete(); + log.debug("Aged off FATE tx {}", idStatus.getFateId()); + } + } finally { - txStore.unreserve(Duration.ZERO); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); + } + })); + } + + public FateCleaner(FateStore<T> store, Duration duration, TimeSource timeSource) { + this.store = store; + this.ageOffTime = duration.toNanos(); + this.timeSource = timeSource; + } +} diff --cc core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 9aa7dcbbc4,0000000000..088e502522 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@@ -1,127 -1,0 +1,127 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.io.Serializable; - import java.time.Duration; +import java.util.Optional; ++import java.util.concurrent.TimeUnit; + +/** + * Transaction Store: a place to save transactions + * + * A transaction consists of a number of operations. To use, first create a fate transaction id, and + * then seed the transaction with an initial operation. An executor service can then execute the + * transaction's operation, possibly pushing more operations onto the transaction as each step + * successfully completes. If a step fails, the stack can be unwound, undoing each operation. + */ +public interface FateStore<T> extends ReadOnlyFateStore<T> { + + /** + * Create a new fate transaction id + * + * @return a new FateId + */ + FateId create(); + + /** + * Creates and reserves a transaction using the given key. If something is already running for the + * given key, then Optional.empty() will be returned. When this returns a non-empty id, it will be + * in the new state. + * + * <p> + * In the case where a process dies in the middle of a call to this. If later, another call is + * made with the same key and its in the new state then the FateId for that key will be returned. + * </p> + * + * @throws IllegalStateException when there is an unexpected collision. This can occur if two key + * hash to the same FateId or if a random FateId already exists. + */ + Optional<FateTxStore<T>> createAndReserve(FateKey fateKey); + + /** + * An interface that allows read/write access to the data related to a single fate operation. + */ + interface FateTxStore<T> extends ReadOnlyFateTxStore<T> { + @Override + Repo<T> top(); + + /** + * Update the given transaction with the next operation + * + * @param repo the operation + */ + void push(Repo<T> repo) throws StackOverflowException; + + /** + * Remove the last pushed operation from the given transaction. + */ + void pop(); + + /** + * Update the state of a given transaction + * + * @param status execution status + */ + void setStatus(TStatus status); + + /** + * Set transaction-specific information. + * + * @param txInfo name of attribute of a transaction to set. + * @param val transaction data to store + */ + void setTransactionInfo(Fate.TxInfo txInfo, Serializable val); + + /** + * Remove the transaction from the store. + * + */ + void delete(); + + /** + * Return the given transaction to the store. + * + * upon successful return the store now controls the referenced transaction id. caller should no + * longer interact with it. + * - * @param deferTime time to keep this transaction from being returned by ++ * @param deferTime time in millis to keep this transaction from being returned by + * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, java.util.function.Consumer)}. + * Must be non-negative. + */ - void unreserve(Duration deferTime); ++ void unreserve(long deferTime, TimeUnit timeUnit); + } + + /** + * Attempt to reserve the fate transaction. + * + * @param fateId The FateId + * @return true if reserved by this call, false if already reserved + */ + Optional<FateTxStore<T>> tryReserve(FateId fateId); + + /** + * Reserve the fate transaction. + * + * Reserving a fate transaction ensures that nothing else in-process interacting via the same + * instance will be operating on that fate transaction. + * + */ + FateTxStore<T> reserve(FateId fateId); + +} diff --cc core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java index ac5147d4a9,0000000000..031a3ece02 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java @@@ -1,111 -1,0 +1,111 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.io.Serializable; - import java.time.Duration; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; ++import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.util.Pair; + +public class WrappedFateTxStore<T> implements FateStore.FateTxStore<T> { + protected final FateStore.FateTxStore<T> wrapped; + + public WrappedFateTxStore(FateStore.FateTxStore<T> wrapped) { + this.wrapped = wrapped; + } + + @Override - public void unreserve(Duration deferTime) { - wrapped.unreserve(deferTime); ++ public void unreserve(long deferTime, TimeUnit timeUnit) { ++ wrapped.unreserve(deferTime, timeUnit); + } + + @Override + public Repo<T> top() { + return wrapped.top(); + } + + @Override + public void push(Repo<T> repo) throws StackOverflowException { + wrapped.push(repo); + } + + @Override + public void pop() { + wrapped.pop(); + } + + @Override + public FateStore.TStatus getStatus() { + return wrapped.getStatus(); + } + + @Override + public Optional<FateKey> getKey() { + return wrapped.getKey(); + } + + @Override + public Pair<TStatus,Optional<FateKey>> getStatusAndKey() { + return wrapped.getStatusAndKey(); + } + + @Override + public void setStatus(FateStore.TStatus status) { + wrapped.setStatus(status); + } + + @Override + public FateStore.TStatus waitForStatusChange(EnumSet<FateStore.TStatus> expected) { + return wrapped.waitForStatusChange(expected); + } + + @Override + public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { + wrapped.setTransactionInfo(txInfo, val); + } + + @Override + public Serializable getTransactionInfo(Fate.TxInfo txInfo) { + return wrapped.getTransactionInfo(txInfo); + } + + @Override + public void delete() { + wrapped.delete(); + } + + @Override + public long timeCreated() { + return wrapped.timeCreated(); + } + + @Override + public FateId getID() { + return wrapped.getID(); + } + + @Override + public List<ReadOnlyRepo<T>> getStack() { + return wrapped.getStack(); + } +} diff --cc core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java index eb0d1dc748,0000000000..1a5a4fb708 mode 100644,000000..100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java @@@ -1,273 -1,0 +1,274 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static java.util.stream.Collectors.toSet; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.Set; ++import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.fate.FateCleaner.TimeSource; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.zookeeper.KeeperException; +import org.junit.jupiter.api.Test; + +public class FateCleanerTest { + + private static class TestTimeSource implements TimeSource { + long time = 0; + + @Override + public long currentTimeNanos() { + return time; + } + + } + + @Test + public void testBasic() throws InterruptedException, KeeperException { + + TestTimeSource tts = new TestTimeSource(); + TestStore testStore = new TestStore(); + FateCleaner<String> cleaner = new FateCleaner<>(testStore, Duration.ofNanos(10), tts); + + cleaner.ageOff(); + + FateId fateId1 = testStore.create(); + var txStore1 = testStore.reserve(fateId1); + txStore1.setStatus(TStatus.IN_PROGRESS); - txStore1.unreserve(Duration.ZERO); ++ txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + cleaner.ageOff(); + + FateId fateId2 = testStore.create(); + var txStore2 = testStore.reserve(fateId2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); - txStore2.unreserve(Duration.ZERO); ++ txStore2.unreserve(0, TimeUnit.MILLISECONDS); + + cleaner.ageOff(); + + tts.time = 6; + + FateId fateId3 = testStore.create(); + var txStore3 = testStore.reserve(fateId3); + txStore3.setStatus(TStatus.IN_PROGRESS); + txStore3.setStatus(TStatus.SUCCESSFUL); - txStore3.unreserve(Duration.ZERO); ++ txStore3.unreserve(0, TimeUnit.MILLISECONDS); + + cleaner.ageOff(); + + FateId fateId4 = testStore.create(); + + cleaner.ageOff(); + + assertEquals(Set.of(fateId1, fateId2, fateId3, fateId4), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + tts.time = 15; + + cleaner.ageOff(); + + assertEquals(Set.of(fateId1, fateId3, fateId4), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + tts.time = 30; + + cleaner.ageOff(); + + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + } + + @Test + public void testNonEmpty() { + // test age off when source store starts off non empty + + TestTimeSource tts = new TestTimeSource(); + TestStore testStore = new TestStore(); + FateId fateId1 = testStore.create(); + var txStore1 = testStore.reserve(fateId1); + txStore1.setStatus(TStatus.IN_PROGRESS); - txStore1.unreserve(Duration.ZERO); ++ txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + FateId fateId2 = testStore.create(); + var txStore2 = testStore.reserve(fateId2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); - txStore2.unreserve(Duration.ZERO); ++ txStore2.unreserve(0, TimeUnit.MILLISECONDS); + + FateId fateId3 = testStore.create(); + var txStore3 = testStore.reserve(fateId3); + txStore3.setStatus(TStatus.IN_PROGRESS); + txStore3.setStatus(TStatus.SUCCESSFUL); - txStore3.unreserve(Duration.ZERO); ++ txStore3.unreserve(0, TimeUnit.MILLISECONDS); + + FateId fateId4 = testStore.create(); + + FateCleaner<String> cleaner = new FateCleaner<>(testStore, Duration.ofNanos(10), tts); + cleaner.ageOff(); + + assertEquals(Set.of(fateId1, fateId2, fateId3, fateId4), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + cleaner.ageOff(); + + assertEquals(Set.of(fateId1, fateId2, fateId3, fateId4), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + tts.time = 15; + + cleaner.ageOff(); + + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + txStore1 = testStore.reserve(fateId1); + txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); - txStore1.unreserve(Duration.ZERO); ++ txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + tts.time = 30; + + cleaner.ageOff(); + + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + txStore1 = testStore.reserve(fateId1); + txStore1.setStatus(TStatus.FAILED); - txStore1.unreserve(Duration.ZERO); ++ txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + cleaner.ageOff(); + + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + tts.time = 42; + + cleaner.ageOff(); + + assertEquals(0, testStore.list().count()); + } + + @Test + public void testStatusChange() { + // test ensure that if something is eligible for ageoff and its status changes it will no longer + // be eligible + + TestTimeSource tts = new TestTimeSource(); + TestStore testStore = new TestStore(); + FateCleaner<String> cleaner = new FateCleaner<>(testStore, Duration.ofHours(10), tts); + + cleaner.ageOff(); + + // create a something in the NEW state + FateId fateId1 = testStore.create(); + + // create another that is complete + FateId fateId2 = testStore.create(); + var txStore2 = testStore.reserve(fateId2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); - txStore2.unreserve(Duration.ZERO); ++ txStore2.unreserve(0, TimeUnit.MILLISECONDS); + + // create another in the NEW state + FateId fateId3 = testStore.create(); + + // start tracking what can age off, both should be candidates + cleaner.ageOff(); + assertEquals(Set.of(fateId1, fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + // advance time by 9 hours, nothing should age off + tts.time += Duration.ofHours(9).toNanos(); + cleaner.ageOff(); + + assertEquals(Set.of(fateId1, fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + var txStore1 = testStore.reserve(fateId1); + txStore1.setStatus(TStatus.IN_PROGRESS); + txStore1.setStatus(TStatus.FAILED); - txStore1.unreserve(Duration.ZERO); ++ txStore1.unreserve(0, TimeUnit.MILLISECONDS); + + // advance time by 2 hours, both should be able to age off.. however the status changed on txid1 + // so it should not age off + tts.time += Duration.ofHours(2).toNanos(); + cleaner.ageOff(); + + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + // advance time by 9 hours, nothing should age off + tts.time += Duration.ofHours(9).toNanos(); + cleaner.ageOff(); + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + // advance time by 2 hours, should age off everything + tts.time += Duration.ofHours(2).toNanos(); + cleaner.ageOff(); + assertEquals(Set.of(), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + } + + @Test + public void testNewCleaner() { + // this test ensures that a new cleaner instance ignores data from another cleaner instance + + TestTimeSource tts = new TestTimeSource(); + TestStore testStore = new TestStore(); + FateCleaner<String> cleaner1 = new FateCleaner<>(testStore, Duration.ofHours(10), tts); + + FateId fateId1 = testStore.create(); + + cleaner1.ageOff(); + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + tts.time += Duration.ofHours(5).toNanos(); + FateId fateId2 = testStore.create(); + + cleaner1.ageOff(); + assertEquals(Set.of(fateId1, fateId2), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + tts.time += Duration.ofHours(6).toNanos(); + FateId fateId3 = testStore.create(); + + cleaner1.ageOff(); + assertEquals(Set.of(fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + // create a new cleaner, it should ignore any data stored by previous cleaner + FateCleaner<String> cleaner2 = new FateCleaner<>(testStore, Duration.ofHours(10), tts); + + tts.time += Duration.ofHours(5).toNanos(); + // since this is a new cleaner instance, it should reset the clock + cleaner2.ageOff(); + assertEquals(Set.of(fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + // since the clock was reset, advancing time should not age anything off + tts.time += Duration.ofHours(9).toNanos(); + cleaner2.ageOff(); + assertEquals(Set.of(fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + + // this should advance time enough to age everything off + tts.time += Duration.ofHours(2).toNanos(); + cleaner2.ageOff(); + assertEquals(Set.of(), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); + } +} diff --cc core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 244edd13d9,3253c41a90..50046a4b9b --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@@ -18,25 -18,13 +18,25 @@@ */ package org.apache.accumulo.core.fate; +import java.io.Serializable; - import java.time.Duration; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; + import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.util.Pair; /** * Transient in memory store for transactions. @@@ -82,118 -61,35 +82,118 @@@ public class TestStore implements FateS } } - @Override - public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) { - if (!reserved.remove(tid)) { - throw new IllegalStateException(); + private class TestFateTxStore implements FateTxStore<String> { + + private final FateId fateId; + + TestFateTxStore(FateId fateId) { + this.fateId = fateId; } - } - @Override - public org.apache.accumulo.core.fate.TStore.TStatus getStatus(long tid) { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); + @Override + public Repo<String> top() { + throw new UnsupportedOperationException(); } - TStatus status = statuses.get(tid); - if (status == null) { - return TStatus.UNKNOWN; + @Override + public List<ReadOnlyRepo<String>> getStack() { + throw new UnsupportedOperationException(); } - return status; - } - @Override - public void setStatus(long tid, org.apache.accumulo.core.fate.TStore.TStatus status) { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); + @Override + public TStatus getStatus() { + return getStatusAndKey().getFirst(); + } + + @Override + public Optional<FateKey> getKey() { + return getStatusAndKey().getSecond(); + } + + @Override + public Pair<TStatus,Optional<FateKey>> getStatusAndKey() { + if (!reserved.contains(fateId)) { + throw new IllegalStateException(); + } + + Pair<TStatus,Optional<FateKey>> status = statuses.get(fateId); + if (status == null) { + return new Pair<>(TStatus.UNKNOWN, Optional.empty()); + } + + return status; + } + + @Override + public TStatus waitForStatusChange(EnumSet<TStatus> expected) { + throw new UnsupportedOperationException(); + } + + @Override + public Serializable getTransactionInfo(Fate.TxInfo txInfo) { + var submap = txInfos.get(fateId); + if (submap == null) { + return null; + } + + return submap.get(txInfo); + } + + @Override + public long timeCreated() { + throw new UnsupportedOperationException(); + } + + @Override + public FateId getID() { + return fateId; + } + + @Override + public void push(Repo<String> repo) throws StackOverflowException { + throw new UnsupportedOperationException(); + } + + @Override + public void pop() { + throw new UnsupportedOperationException(); + } + + @Override + public void setStatus(TStatus status) { + if (!reserved.contains(fateId)) { + throw new IllegalStateException(); + } + Pair<TStatus,Optional<FateKey>> currentStatus = statuses.get(fateId); + if (currentStatus == null) { + throw new IllegalStateException(); + } + statuses.put(fateId, new Pair<>(status, currentStatus.getSecond())); + } + + @Override + public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { + if (!reserved.contains(fateId)) { + throw new IllegalStateException(); + } + + txInfos.computeIfAbsent(fateId, t -> new HashMap<>()).put(txInfo, val); + } + + @Override + public void delete() { + if (!reserved.contains(fateId)) { + throw new IllegalStateException(); + } + statuses.remove(fateId); } - if (!statuses.containsKey(tid)) { - throw new IllegalStateException(); + + @Override - public void unreserve(Duration deferTime) { ++ public void unreserve(long deferTime, TimeUnit timeUnit) { + if (!reserved.remove(fateId)) { + throw new IllegalStateException(); + } } - statuses.put(tid, status); } @Override diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index e2e7956673,a1a85e7b1c..16dd39b09f --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@@ -35,11 -39,10 +35,10 @@@ import static org.apache.accumulo.test. import static org.apache.accumulo.test.util.FileMetadataUtil.splitFilesIntoRanges; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; - import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; @@@ -50,14 -52,15 +49,15 @@@ import java.util.Map.Entry import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.UUID; + import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.accumulo.compactor.Compactor; import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; -import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; @@@ -225,191 -203,6 +225,191 @@@ public class ExternalCompaction_1_IT ex } } + /** + * This test verifies the dead compaction detector does not remove compactions that are committing + * in fate for the Root table. + */ + @Test + public void testCompactionCommitAndDeadDetectionRoot() throws Exception { + var ctx = getCluster().getServerContext(); + FateStore<Manager> zkStore = + new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter()); + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName()); + var allCids = new HashMap<TableId,List<ExternalCompactionId>>(); + var fateId = createCompactionCommitAndDeadMetadata(c, zkStore, AccumuloTable.ROOT.tableName(), + allCids); + verifyCompactionCommitAndDead(zkStore, tableId, fateId, allCids.get(tableId)); + } + } + + /** + * This test verifies the dead compaction detector does not remove compactions that are committing + * in fate for the Metadata table. + */ + @Test + public void testCompactionCommitAndDeadDetectionMeta() throws Exception { + var ctx = getCluster().getServerContext(); + FateStore<Manager> zkStore = + new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter()); + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + // Metadata table by default already has 2 tablets + var tableId = ctx.getTableId(AccumuloTable.METADATA.tableName()); + var allCids = new HashMap<TableId,List<ExternalCompactionId>>(); + var fateId = createCompactionCommitAndDeadMetadata(c, zkStore, + AccumuloTable.METADATA.tableName(), allCids); + verifyCompactionCommitAndDead(zkStore, tableId, fateId, allCids.get(tableId)); + } + } + + /** + * This test verifies the dead compaction detector does not remove compactions that are committing + * in fate for a User table. + */ + @Test + public void testCompactionCommitAndDeadDetectionUser() throws Exception { + var ctx = getCluster().getServerContext(); + final String tableName = getUniqueNames(1)[0]; + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx); + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text(row(MAX_DATA / 2))); + c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); + writeData(c, tableName); + + var tableId = ctx.getTableId(tableName); + var allCids = new HashMap<TableId,List<ExternalCompactionId>>(); + var fateId = createCompactionCommitAndDeadMetadata(c, accumuloStore, tableName, allCids); + verifyCompactionCommitAndDead(accumuloStore, tableId, fateId, allCids.get(tableId)); + } + } + + /** + * This test verifies the dead compaction detector does not remove compactions that are committing + * in fate when all data levels have compactions + */ + @Test + public void testCompactionCommitAndDeadDetectionAll() throws Exception { + var ctx = getCluster().getServerContext(); + final String userTable = getUniqueNames(1)[0]; + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx); + FateStore<Manager> zkStore = + new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter()); + + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text(row(MAX_DATA / 2))); + c.tableOperations().create(userTable, new NewTableConfiguration().withSplits(splits)); + writeData(c, userTable); + + Map<TableId,FateId> fateIds = new HashMap<>(); + Map<TableId,List<ExternalCompactionId>> allCids = new HashMap<>(); + + // create compaction metadata for each data level to test + for (String tableName : List.of(AccumuloTable.ROOT.tableName(), + AccumuloTable.METADATA.tableName(), userTable)) { + var tableId = ctx.getTableId(tableName); + var fateStore = FateInstanceType.fromTableId(tableId) == FateInstanceType.USER + ? accumuloStore : zkStore; + fateIds.put(tableId, + createCompactionCommitAndDeadMetadata(c, fateStore, tableName, allCids)); + } + + // verify the dead compaction was removed for each level + // but not the compaction associated with a fate id + for (Entry<TableId,FateId> entry : fateIds.entrySet()) { + var tableId = entry.getKey(); + var fateStore = FateInstanceType.fromTableId(tableId) == FateInstanceType.USER + ? accumuloStore : zkStore; + verifyCompactionCommitAndDead(fateStore, tableId, entry.getValue(), allCids.get(tableId)); + } + } + } + + private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c, + FateStore<Manager> fateStore, String tableName, + Map<TableId,List<ExternalCompactionId>> allCids) throws Exception { + var ctx = getCluster().getServerContext(); + c.tableOperations().flush(tableName, null, null, true); + var tableId = ctx.getTableId(tableName); + + allCids.put(tableId, List.of(ExternalCompactionId.generate(UUID.randomUUID()), + ExternalCompactionId.generate(UUID.randomUUID()))); + + // Create a fate transaction for one of the compaction ids that is in the new state, it + // should never run. Its purpose is to prevent the dead compaction detector + // from deleting the id. + FateStore.FateTxStore<Manager> fateTx = fateStore + .createAndReserve(FateKey.forCompactionCommit(allCids.get(tableId).get(0))).orElseThrow(); + var fateId = fateTx.getID(); - fateTx.unreserve(Duration.ZERO); ++ fateTx.unreserve(0, TimeUnit.MILLISECONDS); + + // Read the tablet metadata + var tabletsMeta = ctx.getAmple().readTablets().forTable(tableId).build().stream() + .collect(Collectors.toList()); + // Root is always 1 tablet + if (!tableId.equals(AccumuloTable.ROOT.tableId())) { + assertEquals(2, tabletsMeta.size()); + } + + // Insert fake compaction entries in the metadata table. No compactor will report ownership + // of these, so they should look like dead compactions and be removed. However, one of + // them hasan associated fate tx that should prevent its removal. + try (var mutator = ctx.getAmple().mutateTablets()) { + for (int i = 0; i < tabletsMeta.size(); i++) { + var tabletMeta = tabletsMeta.get(0); + var tabletDir = + tabletMeta.getFiles().stream().findFirst().orElseThrow().getPath().getParent(); + var tmpFile = new Path(tabletDir, "C1234.rf_tmp"); + + CompactionMetadata cm = new CompactionMetadata(tabletMeta.getFiles(), + ReferencedTabletFile.of(tmpFile), "localhost:16789", CompactionKind.SYSTEM, (short) 10, + CompactorGroupId.of(GROUP1), false, null); + + mutator.mutateTablet(tabletMeta.getExtent()) + .putExternalCompaction(allCids.get(tableId).get(i), cm).mutate(); + } + } + + return fateId; + } + + private void verifyCompactionCommitAndDead(FateStore<Manager> fateStore, TableId tableId, + FateId fateId, List<ExternalCompactionId> cids) { + var ctx = getCluster().getServerContext(); + + // Wait until the compaction id w/o a fate transaction is removed, should still see the one + // with a fate transaction + Wait.waitFor(() -> { + Set<ExternalCompactionId> currentIds = ctx.getAmple().readTablets().forTable(tableId).build() + .stream().map(TabletMetadata::getExternalCompactions) + .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet()); + System.out.println("currentIds1:" + currentIds); + assertTrue(currentIds.size() == 1 || currentIds.size() == 2); + return currentIds.equals(Set.of(cids.get(0))); + }); + + // Delete the fate transaction, should allow the dead compaction detector to clean up the + // remaining external compaction id + var fateTx = fateStore.reserve(fateId); + fateTx.delete(); - fateTx.unreserve(Duration.ZERO); ++ fateTx.unreserve(0, TimeUnit.MILLISECONDS); + + // wait for the remaining compaction id to be removed + Wait.waitFor(() -> { + Set<ExternalCompactionId> currentIds = ctx.getAmple().readTablets().forTable(tableId).build() + .stream().map(TabletMetadata::getExternalCompactions) + .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet()); + System.out.println("currentIds2:" + currentIds); + assertTrue(currentIds.size() <= 1); + return currentIds.isEmpty(); + }); + } + @Test public void testCompactionAndCompactorDies() throws Exception { String table1 = this.getUniqueNames(1)[0]; diff --cc test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java index 9b43c9c2b9,0000000000..63e8d64703 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java @@@ -1,510 -1,0 +1,511 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.accumulo; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; ++import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateIT.TestRepo; +import org.apache.accumulo.test.fate.FateTestRunner; +import org.apache.accumulo.test.fate.FateTestRunner.TestEnv; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +import com.google.common.base.Throwables; + +public abstract class FateStoreIT extends SharedMiniClusterBase implements FateTestRunner<TestEnv> { + + private static final Method fsCreateByKeyMethod; + + static { + try { + // Private method, need to capture for testing + fsCreateByKeyMethod = AbstractFateStore.class.getDeclaredMethod("create", FateKey.class); + fsCreateByKeyMethod.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(1); + } + + @Test + public void testReadWrite() throws Exception { + executeTest(this::testReadWrite); + } + + protected void testReadWrite(FateStore<TestEnv> store, ServerContext sctx) + throws StackOverflowException { + // Verify no transactions + assertEquals(0, store.list().count()); + + // Create a new transaction and get the store for it + FateId fateId = store.create(); + FateTxStore<TestEnv> txStore = store.reserve(fateId); + assertTrue(txStore.timeCreated() > 0); + assertFalse(txStore.getKey().isPresent()); + assertEquals(1, store.list().count()); + + // Push a test FATE op and verify we can read it back + txStore.setStatus(TStatus.IN_PROGRESS); + txStore.push(new TestRepo("testOp")); + TestRepo op = (TestRepo) txStore.top(); + assertNotNull(op); + + // Test status + txStore.setStatus(TStatus.SUBMITTED); + assertEquals(TStatus.SUBMITTED, txStore.getStatus()); + + // Set a name to test setTransactionInfo() + txStore.setTransactionInfo(TxInfo.TX_NAME, "name"); + assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME)); + + // Try setting a second test op to test getStack() + // when listing or popping TestOperation2 should be first + assertEquals(1, txStore.getStack().size()); + txStore.setStatus(TStatus.IN_PROGRESS); + txStore.push(new TestOperation2()); + // test top returns TestOperation2 + ReadOnlyRepo<TestEnv> top = txStore.top(); + assertInstanceOf(TestOperation2.class, top); + + // test get stack + List<ReadOnlyRepo<TestEnv>> ops = txStore.getStack(); + assertEquals(2, ops.size()); + assertInstanceOf(TestOperation2.class, ops.get(0)); + assertEquals(TestRepo.class, ops.get(1).getClass()); + + // test pop, TestOperation should be left + txStore.setStatus(TStatus.FAILED_IN_PROGRESS); // needed to satisfy the condition on pop + txStore.pop(); + ops = txStore.getStack(); + assertEquals(1, ops.size()); + assertEquals(TestRepo.class, ops.get(0).getClass()); + + // create second + FateTxStore<TestEnv> txStore2 = store.reserve(store.create()); + assertEquals(2, store.list().count()); + + // test delete + txStore.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the condition on delete + txStore.delete(); + assertEquals(1, store.list().count()); + txStore2.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the condition on delete + txStore2.delete(); + assertEquals(0, store.list().count()); + } + + @Test + public void testReadWriteTxInfo() throws Exception { + executeTest(this::testReadWriteTxInfo); + } + + protected void testReadWriteTxInfo(FateStore<TestEnv> store, ServerContext sctx) { + FateId fateId = store.create(); + FateTxStore<TestEnv> txStore = store.reserve(fateId); + + try { + // Go through all enum values to verify each TxInfo type will be properly + // written and read from the store + for (TxInfo txInfo : TxInfo.values()) { + assertNull(txStore.getTransactionInfo(txInfo)); + txStore.setTransactionInfo(txInfo, "value: " + txInfo.name()); + assertEquals("value: " + txInfo.name(), txStore.getTransactionInfo(txInfo)); + } + } finally { + txStore.delete(); + } + + } + + @Test + public void testDeferredOverflow() throws Exception { + executeTest(this::testDeferredOverflow, 10, AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); + } + + protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + // Verify no transactions + assertEquals(0, store.list().count()); + assertFalse(store.isDeferredOverflow()); + + // Store 10 transactions that are all deferred + final Set<FateId> transactions = new HashSet<>(); + for (int i = 0; i < 10; i++) { + FateId fateId = store.create(); + transactions.add(fateId); + FateTxStore<TestEnv> txStore = store.reserve(fateId); + txStore.setStatus(TStatus.SUBMITTED); + assertTrue(txStore.timeCreated() > 0); - txStore.unreserve(Duration.ofSeconds(10)); ++ txStore.unreserve(10, TimeUnit.SECONDS); + } + + // Verify we have 10 transactions and all are deferred + assertEquals(10, store.list().count()); + assertEquals(10, store.getDeferredCount()); + + // Should still be false as we are at thet max but not over yet + assertFalse(store.isDeferredOverflow()); + + var executor = Executors.newCachedThreadPool(); + Future<?> future; + AtomicBoolean keepRunning = new AtomicBoolean(true); + try { + // Run and verify all 10 transactions still exist and were not + // run because of the deferral time of all the transactions + future = executor.submit(() -> store.runnable(keepRunning, transactions::remove)); + Thread.sleep(2000); + assertEquals(10, transactions.size()); + // Setting this flag to false should terminate the task if sleeping + keepRunning.set(false); + // wait for the future to finish to verify the task finished + future.get(); + + // Store one more that should go over the max deferred of 10 + // and should clear the map and set the overflow flag + FateId fateId = store.create(); + transactions.add(fateId); + FateTxStore<TestEnv> txStore = store.reserve(fateId); + txStore.setStatus(TStatus.SUBMITTED); - txStore.unreserve(Duration.ofSeconds(30)); ++ txStore.unreserve(30, TimeUnit.SECONDS); + + // Verify we have 11 transactions stored and none + // deferred anymore because of the overflow + assertEquals(11, store.list().count()); + assertEquals(0, store.getDeferredCount()); + assertTrue(store.isDeferredOverflow()); + + // Run and verify all 11 transactions were processed + // and removed from the store + keepRunning.set(true); + future = executor.submit(() -> store.runnable(keepRunning, transactions::remove)); + Wait.waitFor(transactions::isEmpty); + // Setting this flag to false should terminate the task if sleeping + keepRunning.set(false); + // wait for the future to finish to verify the task finished + future.get(); + + // Overflow should now be reset to false so adding another deferred + // transaction should now go back into the deferral map and flag should + // still be false as we are under the limit + assertFalse(store.isDeferredOverflow()); + txStore = store.reserve(store.create()); - txStore.unreserve(Duration.ofSeconds(30)); ++ txStore.unreserve(30, TimeUnit.SECONDS); + assertEquals(1, store.getDeferredCount()); + assertFalse(store.isDeferredOverflow()); + } finally { + executor.shutdownNow(); + // Cleanup so we don't interfere with other tests + // All stores should already be unreserved + store.list().forEach( + fateIdStatus -> store.tryReserve(fateIdStatus.getFateId()).orElseThrow().delete()); + } + } + + @Test + public void testCreateWithKey() throws Exception { + executeTest(this::testCreateWithKey); + } + + protected void testCreateWithKey(FateStore<TestEnv> store, ServerContext sctx) { + KeyExtent ke1 = + new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); + + long existing = store.list().count(); + FateKey fateKey1 = FateKey.forSplit(ke1); + FateKey fateKey2 = + FateKey.forCompactionCommit(ExternalCompactionId.generate(UUID.randomUUID())); + + FateTxStore<TestEnv> txStore1 = store.createAndReserve(fateKey1).orElseThrow(); + FateTxStore<TestEnv> txStore2 = store.createAndReserve(fateKey2).orElseThrow(); + + assertNotEquals(txStore1.getID(), txStore2.getID()); + + try { + assertTrue(txStore1.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore1.getStatus()); + assertEquals(fateKey1, txStore1.getKey().orElseThrow()); + + assertTrue(txStore2.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore2.getStatus()); + assertEquals(fateKey2, txStore2.getKey().orElseThrow()); + + assertEquals(existing + 2, store.list().count()); + } finally { + txStore1.delete(); + txStore2.delete(); - txStore1.unreserve(Duration.ZERO); - txStore2.unreserve(Duration.ZERO); ++ txStore1.unreserve(0, TimeUnit.SECONDS); ++ txStore2.unreserve(0, TimeUnit.SECONDS); + } + } + + @Test + public void testCreateWithKeyDuplicate() throws Exception { + executeTest(this::testCreateWithKeyDuplicate); + } + + protected void testCreateWithKeyDuplicate(FateStore<TestEnv> store, ServerContext sctx) { + KeyExtent ke = + new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); + + // Creating with the same key should be fine if the status is NEW + // A second call to createAndReserve() should just return an empty optional + // since it's already in reserved and in progress + FateKey fateKey = FateKey.forSplit(ke); + FateTxStore<TestEnv> txStore = store.createAndReserve(fateKey).orElseThrow(); + + // second call is empty + assertTrue(store.createAndReserve(fateKey).isEmpty()); + + try { + assertTrue(txStore.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore.getStatus()); + assertEquals(fateKey, txStore.getKey().orElseThrow()); + assertEquals(1, store.list().count()); + } finally { + txStore.delete(); - txStore.unreserve(Duration.ZERO); ++ txStore.unreserve(0, TimeUnit.SECONDS); + } + } + + @Test + public void testCreateWithKeyInProgress() throws Exception { + executeTest(this::testCreateWithKeyInProgress); + } + + protected void testCreateWithKeyInProgress(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + KeyExtent ke = + new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); + FateKey fateKey = FateKey.forSplit(ke); + + FateTxStore<TestEnv> txStore = store.createAndReserve(fateKey).orElseThrow(); + + try { + assertTrue(txStore.timeCreated() > 0); + txStore.setStatus(TStatus.IN_PROGRESS); + + // We have an existing transaction with the same key in progress + // so should return an empty Optional + assertTrue(create(store, fateKey).isEmpty()); + assertEquals(TStatus.IN_PROGRESS, txStore.getStatus()); + } finally { + txStore.setStatus(TStatus.SUCCESSFUL); + txStore.delete(); - txStore.unreserve(Duration.ZERO); ++ txStore.unreserve(0, TimeUnit.SECONDS); + } + + try { + // After deletion, make sure we can create again with the same key + txStore = store.createAndReserve(fateKey).orElseThrow(); + assertTrue(txStore.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore.getStatus()); + } finally { + txStore.delete(); - txStore.unreserve(Duration.ZERO); ++ txStore.unreserve(0, TimeUnit.SECONDS); + } + + } + + @Test + public void testCreateWithKeyCollision() throws Exception { + // Replace the default hasing algorithm with one that always returns the same tid so + // we can check duplicate detection with different keys + executeTest(this::testCreateWithKeyCollision, AbstractFateStore.DEFAULT_MAX_DEFERRED, + (instanceType, fateKey) -> FateId.from(instanceType, 1000)); + } + + protected void testCreateWithKeyCollision(FateStore<TestEnv> store, ServerContext sctx) { + String[] tables = getUniqueNames(2); + KeyExtent ke1 = new KeyExtent(TableId.of(tables[0]), new Text("zzz"), new Text("aaa")); + KeyExtent ke2 = new KeyExtent(TableId.of(tables[1]), new Text("ddd"), new Text("bbb")); + + FateKey fateKey1 = FateKey.forSplit(ke1); + FateKey fateKey2 = FateKey.forSplit(ke2); + + FateTxStore<TestEnv> txStore = store.createAndReserve(fateKey1).orElseThrow(); + try { + var e = assertThrows(IllegalStateException.class, () -> create(store, fateKey2)); + assertEquals("Collision detected for tid 1000", e.getMessage()); + assertEquals(fateKey1, txStore.getKey().orElseThrow()); + } finally { + txStore.delete(); - txStore.unreserve(Duration.ZERO); ++ txStore.unreserve(0, TimeUnit.SECONDS); + } + + } + + @Test + public void testCollisionWithRandomFateId() throws Exception { + executeTest(this::testCollisionWithRandomFateId); + } + + protected void testCollisionWithRandomFateId(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + KeyExtent ke = + new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); + + FateKey fateKey = FateKey.forSplit(ke); + FateId fateId = create(store, fateKey).orElseThrow(); + + // After create a fate transaction using a key we can simulate a collision with + // a random FateId by deleting the key out of Fate and calling create again to verify + // it detects the key is missing. Then we can continue and see if we can still reserve + // and use the existing transaction, which we should. + deleteKey(fateId, sctx); + var e = assertThrows(IllegalStateException.class, () -> store.createAndReserve(fateKey)); + assertEquals("Tx Key is missing from tid " + fateId.getTid(), e.getMessage()); + + // We should still be able to reserve and continue when not using a key + // just like a normal transaction + FateTxStore<TestEnv> txStore = store.reserve(fateId); + try { + assertTrue(txStore.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore.getStatus()); + } finally { + txStore.delete(); - txStore.unreserve(Duration.ZERO); ++ txStore.unreserve(0, TimeUnit.SECONDS); + } + + } + + @Test + public void testListFateKeys() throws Exception { + executeTest(this::testListFateKeys); + } + + protected void testListFateKeys(FateStore<TestEnv> store, ServerContext sctx) throws Exception { + + // this should not be seen when listing by key type because it has no key + var id1 = store.create(); + + TableId tid1 = TableId.of("test"); + var extent1 = new KeyExtent(tid1, new Text("m"), null); + var extent2 = new KeyExtent(tid1, null, new Text("m")); + var fateKey1 = FateKey.forSplit(extent1); + var fateKey2 = FateKey.forSplit(extent2); + + var cid1 = ExternalCompactionId.generate(UUID.randomUUID()); + var cid2 = ExternalCompactionId.generate(UUID.randomUUID()); + + assertNotEquals(cid1, cid2); + + var fateKey3 = FateKey.forCompactionCommit(cid1); + var fateKey4 = FateKey.forCompactionCommit(cid2); + + Map<FateKey,FateId> fateKeyIds = new HashMap<>(); + for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) { + var fateTx = store.createAndReserve(fateKey).orElseThrow(); + fateKeyIds.put(fateKey, fateTx.getID()); - fateTx.unreserve(Duration.ZERO); ++ fateTx.unreserve(0, TimeUnit.MILLISECONDS); + } + + HashSet<FateId> allIds = new HashSet<>(); + allIds.addAll(fateKeyIds.values()); + allIds.add(id1); + assertEquals(allIds, store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet())); + assertEquals(5, allIds.size()); + + assertEquals(4, fateKeyIds.size()); + assertEquals(4, fateKeyIds.values().stream().distinct().count()); + + HashSet<KeyExtent> seenExtents = new HashSet<>(); + store.list(FateKey.FateKeyType.SPLIT).forEach(fateKey -> { + assertEquals(FateKey.FateKeyType.SPLIT, fateKey.getType()); + assertNotNull(fateKeyIds.remove(fateKey)); + assertTrue(seenExtents.add(fateKey.getKeyExtent().orElseThrow())); + }); + + assertEquals(2, fateKeyIds.size()); + assertEquals(Set.of(extent1, extent2), seenExtents); + + HashSet<ExternalCompactionId> seenCids = new HashSet<>(); + store.list(FateKey.FateKeyType.COMPACTION_COMMIT).forEach(fateKey -> { + assertEquals(FateKey.FateKeyType.COMPACTION_COMMIT, fateKey.getType()); + assertNotNull(fateKeyIds.remove(fateKey)); + assertTrue(seenCids.add(fateKey.getCompactionId().orElseThrow())); + }); + + assertEquals(0, fateKeyIds.size()); + assertEquals(Set.of(cid1, cid2), seenCids); + } + + // create(fateKey) method is private so expose for testing to check error states + @SuppressWarnings("unchecked") + protected Optional<FateId> create(FateStore<TestEnv> store, FateKey fateKey) throws Exception { + try { + return (Optional<FateId>) fsCreateByKeyMethod.invoke(store, fateKey); + } catch (Exception e) { + Exception rootCause = (Exception) Throwables.getRootCause(e); + throw rootCause; + } + } + + protected abstract void deleteKey(FateId fateId, ServerContext sctx); + + private static class TestOperation2 extends TestRepo { + + private static final long serialVersionUID = 1L; + + public TestOperation2() { + super("testOperation2"); + } + } + +}