This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 7da7ac89cc Fixes discrepancy between `admin fate delete` and `UserFateStore.delete` (#5016) 7da7ac89cc is described below commit 7da7ac89cc54a4c8e660eb713b0afd5c525c0ec9 Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Tue Nov 5 08:37:31 2024 -0500 Fixes discrepancy between `admin fate delete` and `UserFateStore.delete` (#5016) * Fixes discrepancy between `admin fate delete` and `UserFateStore.delete` `admin fate delete` allows users to delete a transaction. If this transaction is a USER transaction, `UserFateStore.delete()` is how this is achieved. The admin code allows transactions with a status of SUBMITTED, IN_PROGRESS, NEW, FAILED, FAILED_IN_PROGRESS, SUCCESSFUL to call `delete()`, but the `delete()` code only allows NEW, SUBMITTED, SUCCESSFUL, FAILED. Admin now calls a force delete to delete the transaction regardless of state. forceDelete() is a new method only to be used by Admin. --- .../main/java/org/apache/accumulo/core/fate/AdminUtil.java | 2 +- core/src/main/java/org/apache/accumulo/core/fate/Fate.java | 2 +- .../main/java/org/apache/accumulo/core/fate/FateStore.java | 6 ++++++ .../org/apache/accumulo/core/fate/WrappedFateTxStore.java | 12 +++++++++++- .../org/apache/accumulo/core/fate/user/UserFateStore.java | 12 ++++++++++++ .../apache/accumulo/core/fate/zookeeper/MetaFateStore.java | 5 +++++ .../java/org/apache/accumulo/core/logging/FateLogger.java | 13 ++++++++----- .../test/java/org/apache/accumulo/core/fate/TestStore.java | 6 ++++++ 8 files changed, 50 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 97c47b39ca..059044b754 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -476,7 +476,7 @@ public class AdminUtil<T> { case FAILED_IN_PROGRESS: case SUCCESSFUL: System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts); - txStore.delete(); + txStore.forceDelete(); state = true; break; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 1df4c36eec..1350cce652 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -344,7 +344,7 @@ public class Fate<T> { */ public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner, Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) { - this.store = FateLogger.wrap(store, toLogStrFunc); + this.store = FateLogger.wrap(store, toLogStrFunc, false); this.environment = environment; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index ae193d8df8..09ee12dd94 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -104,6 +104,12 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { */ void delete(); + /** + * Force remove the transaction from the store regardless of the status. Only to be used by + * {@link AdminUtil} + */ + void forceDelete(); + /** * Return the given transaction to the store. * diff --git a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java index f121a902e0..d282e304f1 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java @@ -24,11 +24,15 @@ import java.util.EnumSet; import java.util.List; import java.util.Optional; +import com.google.common.base.Preconditions; + public class WrappedFateTxStore<T> implements FateStore.FateTxStore<T> { protected final FateStore.FateTxStore<T> wrapped; + private final boolean allowForceDel; - public WrappedFateTxStore(FateStore.FateTxStore<T> wrapped) { + public WrappedFateTxStore(FateStore.FateTxStore<T> wrapped, boolean allowForceDel) { this.wrapped = wrapped; + this.allowForceDel = allowForceDel; } @Override @@ -86,6 +90,12 @@ public class WrappedFateTxStore<T> implements FateStore.FateTxStore<T> { wrapped.delete(); } + @Override + public void forceDelete() { + Preconditions.checkState(allowForceDel, "Force delete is not allowed"); + wrapped.forceDelete(); + } + @Override public long timeCreated() { return wrapped.timeCreated(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 7446d1fafe..efd0cbc62f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -544,6 +544,18 @@ public class UserFateStore<T> extends AbstractFateStore<T> { this.deleted = true; } + @Override + public void forceDelete() { + verifyReservedAndNotDeleted(true); + + var mutator = newMutator(fateId); + // allow deletion of all txns other than UNKNOWN + mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED, + TStatus.FAILED_IN_PROGRESS, TStatus.IN_PROGRESS); + mutator.delete().mutate(); + this.deleted = true; + } + private Optional<Integer> findTop() { return scanTx(scanner -> { scanner.setRange(getRow(fateId)); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index d19db17004..d6da05e844 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -369,6 +369,11 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { } } + @Override + public void forceDelete() { + delete(); + } + @Override public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) { verifyReservedAndNotDeleted(true); diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index 17a64541c9..4a9f2517c0 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -51,8 +51,9 @@ public class FateLogger { private final Function<Repo<T>,String> toLogString; - private LoggingFateTxStore(FateTxStore<T> wrapped, Function<Repo<T>,String> toLogString) { - super(wrapped); + private LoggingFateTxStore(FateTxStore<T> wrapped, Function<Repo<T>,String> toLogString, + boolean allowForceDel) { + super(wrapped, allowForceDel); this.toLogString = toLogString; } @@ -97,19 +98,21 @@ public class FateLogger { } } - public static <T> FateStore<T> wrap(FateStore<T> store, Function<Repo<T>,String> toLogString) { + public static <T> FateStore<T> wrap(FateStore<T> store, Function<Repo<T>,String> toLogString, + boolean allowForceDel) { // only logging operations that change the persisted data, not operations that only read data return new FateStore<>() { @Override public FateTxStore<T> reserve(FateId fateId) { - return new LoggingFateTxStore<>(store.reserve(fateId), toLogString); + return new LoggingFateTxStore<>(store.reserve(fateId), toLogString, allowForceDel); } @Override public Optional<FateTxStore<T>> tryReserve(FateId fateId) { - return store.tryReserve(fateId).map(ftxs -> new LoggingFateTxStore<>(ftxs, toLogString)); + return store.tryReserve(fateId) + .map(ftxs -> new LoggingFateTxStore<>(ftxs, toLogString, allowForceDel)); } @Override diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 859fe5040a..2c54464663 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -199,6 +199,12 @@ public class TestStore implements FateStore<String> { statuses.remove(fateId); } + @Override + public void forceDelete() { + throw new UnsupportedOperationException( + this.getClass().getSimpleName() + " should not be calling forceDelete()"); + } + @Override public void unreserve(Duration deferTime) { if (!reserved.remove(fateId)) {