This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 0ff1b21f2e Adds FateOperation type (#5218) 0ff1b21f2e is described below commit 0ff1b21f2e79f9a82347485739258b93a9bdc259 Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Tue Jan 7 10:10:12 2025 -0500 Adds FateOperation type (#5218) Adds `FateOperation` enum. This consolidates all fate operations under one type (those used in thrift and those passed directly to a Fate object (outside of thrift)). This avoids the use of String here. - Adds a `FateOperation` enum which includes all current fate operations - Renamed existing thrift type `FateOperation` to `TFateOperation` - `FateOperation` includes all `TFateOperation`s and fate operations performed outside of thrift (one example is `COMMIT_COMPACTION`) - `FateOperation` is now the type passed around instead of a String - Removed OBSOLETE_TABLE_BULK_IMPORT from `TFateOperation` since it is no longer used (was bulk import v1). Keep enum integers stable to avoid confusion across different versions of Accumulo - New `FateOperationTest` to test `TFateOperation` and `FateOperation` --- .../core/clientImpl/NamespaceOperationsImpl.java | 10 ++-- .../core/clientImpl/TableOperationsImpl.java | 44 ++++++++-------- .../org/apache/accumulo/core/fate/AdminUtil.java | 11 ++-- .../java/org/apache/accumulo/core/fate/Fate.java | 55 ++++++++++++++++++-- .../org/apache/accumulo/core/fate/FateStore.java | 9 ++-- .../accumulo/core/fate/user/UserFateStore.java | 8 +-- .../core/fate/zookeeper/MetaFateStore.java | 7 +-- .../apache/accumulo/core/logging/FateLogger.java | 6 +-- .../accumulo/core/manager/thrift/FateService.java | 38 +++++++------- .../{FateOperation.java => TFateOperation.java} | 9 ++-- core/src/main/thrift/manager.thrift | 42 ++++++++-------- .../accumulo/core/fate/FateOperationTest.java | 48 ++++++++++++++++++ .../org/apache/accumulo/core/fate/TestStore.java | 6 +-- .../server/security/AuditedSecurityOperation.java | 8 +-- .../server/security/SecurityOperation.java | 4 +- .../server/util/fateCommand/FateSummaryReport.java | 6 ++- .../server/util/fateCommand/FateTxnDetails.java | 2 +- .../server/util/fateCommand/TxnDetailsTest.java | 7 +-- .../accumulo/manager/FateServiceHandler.java | 58 ++++++++++++---------- .../manager/ManagerClientServiceHandler.java | 2 +- .../coordinator/CompactionCoordinator.java | 4 +- .../manager/metrics/fate/FateMetricValues.java | 9 ++-- .../accumulo/manager/split/SeedSplitTask.java | 5 +- .../test/compaction/ExternalCompaction_1_IT.java | 3 +- .../java/org/apache/accumulo/test/fate/FateIT.java | 21 ++++---- .../org/apache/accumulo/test/fate/FateStoreIT.java | 32 ++++++------ .../apache/accumulo/test/fate/FateStoreUtil.java | 8 ++- .../accumulo/test/fate/MultipleStoresIT.java | 7 +-- .../test/functional/FateConcurrencyIT.java | 5 +- 29 files changed, 297 insertions(+), 177 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java index 378147bea6..2e80915fb8 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java @@ -56,7 +56,7 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.constraints.Constraint; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.manager.thrift.FateOperation; +import org.apache.accumulo.core.manager.thrift.TFateOperation; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; @@ -126,7 +126,7 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { NEW_NAMESPACE_NAME.validate(namespace); try { - doNamespaceFateOperation(FateOperation.NAMESPACE_CREATE, + doNamespaceFateOperation(TFateOperation.NAMESPACE_CREATE, Arrays.asList(ByteBuffer.wrap(namespace.getBytes(UTF_8))), Collections.emptyMap(), namespace); } catch (NamespaceNotFoundException e) { @@ -156,7 +156,7 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { Map<String,String> opts = new HashMap<>(); try { - doNamespaceFateOperation(FateOperation.NAMESPACE_DELETE, args, opts, namespace); + doNamespaceFateOperation(TFateOperation.NAMESPACE_DELETE, args, opts, namespace); } catch (NamespaceExistsException e) { // should not happen throw new AssertionError(e); @@ -174,7 +174,7 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldNamespaceName.getBytes(UTF_8)), ByteBuffer.wrap(newNamespaceName.getBytes(UTF_8))); Map<String,String> opts = new HashMap<>(); - doNamespaceFateOperation(FateOperation.NAMESPACE_RENAME, args, opts, oldNamespaceName); + doNamespaceFateOperation(TFateOperation.NAMESPACE_RENAME, args, opts, oldNamespaceName); } @Override @@ -385,7 +385,7 @@ public class NamespaceOperationsImpl extends NamespaceOperationsHelper { return super.addConstraint(namespace, constraintClassName); } - private String doNamespaceFateOperation(FateOperation op, List<ByteBuffer> args, + private String doNamespaceFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,String> opts, String namespace) throws AccumuloSecurityException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException { // caller should validate the namespace name diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 36de59f9e8..0b1ff70851 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -140,11 +140,11 @@ import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.manager.state.tables.TableState; -import org.apache.accumulo.core.manager.thrift.FateOperation; import org.apache.accumulo.core.manager.thrift.FateService; import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.TFateId; import org.apache.accumulo.core.manager.thrift.TFateInstanceType; +import org.apache.accumulo.core.manager.thrift.TFateOperation; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; @@ -274,7 +274,7 @@ public class TableOperationsImpl extends TableOperationsHelper { Map<String,String> opts = ntc.getProperties(); try { - doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_CREATE, args, + doTableFateOperation(tableName, AccumuloException.class, TFateOperation.TABLE_CREATE, args, opts); } catch (TableNotFoundException e) { // should not happen @@ -304,7 +304,7 @@ public class TableOperationsImpl extends TableOperationsHelper { // This method is for retrying in the case of network failures; // anything else it passes to the caller to deal with - private void executeFateOperation(TFateId opid, FateOperation op, List<ByteBuffer> args, + private void executeFateOperation(TFateId opid, TFateOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean autoCleanUp) throws ThriftSecurityException, TException, ThriftTableOperationException { while (true) { @@ -372,7 +372,7 @@ public class TableOperationsImpl extends TableOperationsHelper { EXISTING_TABLE_NAME.validate(tableName); try { - return doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(), + return doFateOperation(TFateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(), tableName); } catch (TableExistsException | NamespaceExistsException e) { // should not happen @@ -427,14 +427,14 @@ public class TableOperationsImpl extends TableOperationsHelper { } } - String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, + String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName) throws AccumuloSecurityException, TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException { return doFateOperation(op, args, opts, tableOrNamespaceName, true); } - String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, + String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName, boolean wait) throws AccumuloSecurityException, TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException { @@ -521,7 +521,7 @@ public class TableOperationsImpl extends TableOperationsHelper { return handleFateOperation(() -> { TFateInstanceType t = FateInstanceType.fromNamespaceOrTableName(tableName).toThrift(); TFateId opid = beginFateOperation(t); - executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, Map.of(), false); + executeFateOperation(opid, TFateOperation.TABLE_SPLIT, args, Map.of(), false); return new Pair<>(opid, splitsForTablet.getValue()); }, tableName); } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException @@ -645,8 +645,8 @@ public class TableOperationsImpl extends TableOperationsHelper { end == null ? EMPTY : TextUtil.getByteBuffer(end)); Map<String,String> opts = new HashMap<>(); try { - doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_MERGE, args, - opts); + doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_MERGE, + args, opts); } catch (TableExistsException e) { // should not happen throw new AssertionError(e); @@ -665,7 +665,7 @@ public class TableOperationsImpl extends TableOperationsHelper { Map<String,String> opts = new HashMap<>(); try { doTableFateOperation(tableName, TableNotFoundException.class, - FateOperation.TABLE_DELETE_RANGE, args, opts); + TFateOperation.TABLE_DELETE_RANGE, args, opts); } catch (TableExistsException e) { // should not happen throw new AssertionError(e); @@ -760,7 +760,7 @@ public class TableOperationsImpl extends TableOperationsHelper { List<ByteBuffer> args = List.of(ByteBuffer.wrap(tableName.getBytes(UTF_8))); Map<String,String> opts = new HashMap<>(); try { - doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE, + doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_DELETE, args, opts); } catch (TableExistsException e) { // should not happen @@ -800,7 +800,7 @@ public class TableOperationsImpl extends TableOperationsHelper { prependPropertiesToExclude(opts, config.getPropertiesToExclude()); - doTableFateOperation(newTableName, AccumuloException.class, FateOperation.TABLE_CLONE, args, + doTableFateOperation(newTableName, AccumuloException.class, TFateOperation.TABLE_CLONE, args, opts); } @@ -813,7 +813,7 @@ public class TableOperationsImpl extends TableOperationsHelper { List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes(UTF_8)), ByteBuffer.wrap(newTableName.getBytes(UTF_8))); Map<String,String> opts = new HashMap<>(); - doTableFateOperation(oldTableName, TableNotFoundException.class, FateOperation.TABLE_RENAME, + doTableFateOperation(oldTableName, TableNotFoundException.class, TFateOperation.TABLE_RENAME, args, opts); } @@ -892,7 +892,7 @@ public class TableOperationsImpl extends TableOperationsHelper { Map<String,String> opts = new HashMap<>(); try { - doFateOperation(FateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait()); + doFateOperation(TFateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait()); } catch (TableExistsException | NamespaceExistsException e) { // should not happen throw new AssertionError(e); @@ -912,7 +912,7 @@ public class TableOperationsImpl extends TableOperationsHelper { try { doTableFateOperation(tableName, TableNotFoundException.class, - FateOperation.TABLE_CANCEL_COMPACT, args, opts); + TFateOperation.TABLE_CANCEL_COMPACT, args, opts); } catch (TableExistsException e) { // should not happen throw new AssertionError(e); @@ -1455,17 +1455,17 @@ public class TableOperationsImpl extends TableOperationsHelper { TableId tableId = context.getTableId(tableName); - FateOperation op = null; + TFateOperation op = null; switch (newState) { case OFFLINE: - op = FateOperation.TABLE_OFFLINE; + op = TFateOperation.TABLE_OFFLINE; if (tableName.equals(AccumuloTable.METADATA.tableName()) || tableName.equals(AccumuloTable.ROOT.tableName())) { throw new AccumuloException("Cannot set table to offline state"); } break; case ONLINE: - op = FateOperation.TABLE_ONLINE; + op = TFateOperation.TABLE_ONLINE; if (tableName.equals(AccumuloTable.METADATA.tableName()) || tableName.equals(AccumuloTable.ROOT.tableName())) { // Don't submit a Fate operation for this, these tables can only be online. @@ -1694,7 +1694,7 @@ public class TableOperationsImpl extends TableOperationsHelper { checkedImportDirs.stream().map(s -> s.getBytes(UTF_8)).map(ByteBuffer::wrap).forEach(args::add); try { - doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_IMPORT, args, + doTableFateOperation(tableName, AccumuloException.class, TFateOperation.TABLE_IMPORT, args, Collections.emptyMap()); } catch (TableNotFoundException e) { // should not happen @@ -1727,7 +1727,7 @@ public class TableOperationsImpl extends TableOperationsHelper { Map<String,String> opts = Collections.emptyMap(); try { - doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_EXPORT, + doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_EXPORT, args, opts); } catch (TableExistsException e) { // should not happen @@ -1782,7 +1782,7 @@ public class TableOperationsImpl extends TableOperationsHelper { } private void doTableFateOperation(String tableOrNamespaceName, - Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op, + Class<? extends Exception> namespaceNotFoundExceptionClass, TFateOperation op, List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException { try { @@ -2212,7 +2212,7 @@ public class TableOperationsImpl extends TableOperationsHelper { try { doTableFateOperation(tableName, AccumuloException.class, - FateOperation.TABLE_TABLET_AVAILABILITY, args, opts); + TFateOperation.TABLE_TABLET_AVAILABILITY, args, opts); } catch (TableNotFoundException | TableExistsException e) { // should not happen throw new AssertionError(e); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 0fe448d7d5..384099a832 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -76,14 +76,15 @@ public class AdminUtil<T> { private final FateId fateId; private final FateInstanceType instanceType; private final TStatus status; - private final String txName; + private final Fate.FateOperation txName; private final List<String> hlocks; private final List<String> wlocks; private final String top; private final long timeCreated; private TransactionStatus(FateId fateId, FateInstanceType instanceType, TStatus status, - String txName, List<String> hlocks, List<String> wlocks, String top, Long timeCreated) { + Fate.FateOperation txName, List<String> hlocks, List<String> wlocks, String top, + Long timeCreated) { this.fateId = fateId; this.instanceType = instanceType; @@ -115,7 +116,7 @@ public class AdminUtil<T> { /** * @return The name of the transaction running. */ - public String getTxName() { + public Fate.FateOperation getTxName() { return txName; } @@ -361,7 +362,9 @@ public class AdminUtil<T> { fateIds.forEach(fateId -> { ReadOnlyFateTxStore<T> txStore = store.read(fateId); - String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME); + // tx name will not be set if the tx is not seeded with work (it is NEW) + Fate.FateOperation txName = txStore.getTransactionInfo(Fate.TxInfo.TX_NAME) == null ? null + : ((Fate.FateOperation) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME)); List<String> hlocks = heldLocks.remove(fateId); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index a45cb3a405..de6e7073ec 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -54,6 +54,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.logging.FateLogger; +import org.apache.accumulo.core.manager.thrift.TFateOperation; import org.apache.accumulo.core.util.ShutdownUtil; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; @@ -88,6 +89,53 @@ public class Fate<T> { TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE } + public enum FateOperation { + COMMIT_COMPACTION(null), + NAMESPACE_CREATE(TFateOperation.NAMESPACE_CREATE), + NAMESPACE_DELETE(TFateOperation.NAMESPACE_DELETE), + NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME), + SHUTDOWN_TSERVER(null), + SYSTEM_SPLIT(null), + TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2), + TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT), + TABLE_CLONE(TFateOperation.TABLE_CLONE), + TABLE_COMPACT(TFateOperation.TABLE_COMPACT), + TABLE_CREATE(TFateOperation.TABLE_CREATE), + TABLE_DELETE(TFateOperation.TABLE_DELETE), + TABLE_DELETE_RANGE(TFateOperation.TABLE_DELETE_RANGE), + TABLE_EXPORT(TFateOperation.TABLE_EXPORT), + TABLE_IMPORT(TFateOperation.TABLE_IMPORT), + TABLE_MERGE(TFateOperation.TABLE_MERGE), + TABLE_OFFLINE(TFateOperation.TABLE_OFFLINE), + TABLE_ONLINE(TFateOperation.TABLE_ONLINE), + TABLE_RENAME(TFateOperation.TABLE_RENAME), + TABLE_SPLIT(TFateOperation.TABLE_SPLIT), + TABLE_TABLET_AVAILABILITY(TFateOperation.TABLE_TABLET_AVAILABILITY); + + private final TFateOperation top; + private static final EnumSet<FateOperation> nonThriftOps = + EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT); + + FateOperation(TFateOperation top) { + this.top = top; + } + + public static FateOperation fromThrift(TFateOperation top) { + return FateOperation.valueOf(top.name()); + } + + public static EnumSet<FateOperation> getNonThriftOps() { + return nonThriftOps; + } + + public TFateOperation toThrift() { + if (top == null) { + throw new IllegalStateException(this + " does not have an equivalent thrift form"); + } + return top; + } + } + /** * A single thread that finds transactions to work on and queues them up. Do not want each worker * thread going to the store and looking for work as it would place more load on the store. @@ -437,14 +485,15 @@ public class Fate<T> { return store.create(); } - public void seedTransaction(String txName, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { + public void seedTransaction(FateOperation txName, FateKey fateKey, Repo<T> repo, + boolean autoCleanUp) { store.seedTransaction(txName, fateKey, repo, autoCleanUp); } // start work in the transaction.. it is safe to call this // multiple times for a transaction... but it will only seed once - public void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp, - String goalMessage) { + public void seedTransaction(FateOperation txName, FateId fateId, Repo<T> repo, + boolean autoCleanUp, String goalMessage) { log.info("Seeding {} {}", fateId, goalMessage); store.seedTransaction(txName, fateId, repo, autoCleanUp); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index d434770461..160d872712 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -50,8 +50,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { FateId create(); /** - * Seeds a transaction with the given repo if it does not exists. A fateId will be derived from - * the fateKey. If seeded, sets the following data for the fateId in the store. + * Seeds a transaction with the given repo if it does not exist. A fateId will be derived from the + * fateKey. If seeded, sets the following data for the fateId in the store. * * <ul> * <li>Set the tx name</li> @@ -66,7 +66,7 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { * empty optional otherwise. If there was a failure this could return an empty optional * when it actually succeeded. */ - Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo, + Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Repo<T> repo, boolean autoCleanUp); /** @@ -84,7 +84,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { * failures. When there are no failures returns true if seeded and false otherwise. If * there was a failure this could return false when it actually succeeded. */ - boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp); + boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo, + boolean autoCleanUp); /** * An interface that allows read/write access to the data related to a single fate operation. diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 195848e276..3d7c039e0f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; @@ -126,7 +127,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { } @Override - public Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo, + public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireAbsent() @@ -139,14 +140,15 @@ public class UserFateStore<T> extends AbstractFateStore<T> { } @Override - public boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp) { + public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo, + boolean autoCleanUp) { Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireStatus(TStatus.NEW).requireUnreserved().requireAbsentKey(); return seedTransaction(mutatorFactory, fateId.canonical(), txName, repo, autoCleanUp); } private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, String logId, - String txName, Repo<T> repo, boolean autoCleanUp) { + Fate.FateOperation txName, Repo<T> repo, boolean autoCleanUp) { int maxAttempts = 5; for (int attempt = 0; attempt < maxAttempts; attempt++) { var mutator = mutatorFactory.get(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index 4a691417c6..e639ac5712 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -172,7 +172,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { } @Override - public Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo, + public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) { return createAndReserve(fateKey).map(txStore -> { try { @@ -185,7 +185,8 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { } @Override - public boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp) { + public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo, + boolean autoCleanUp) { return tryReserve(fateId).map(txStore -> { try { if (txStore.getStatus() == NEW) { @@ -199,7 +200,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { }).orElse(false); } - private void seedTransaction(String txName, Repo<T> repo, boolean autoCleanUp, + private void seedTransaction(Fate.FateOperation txName, Repo<T> repo, boolean autoCleanUp, FateTxStore<T> txStore) { if (txStore.top() == null) { try { diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index 9a5984f4ed..d25e87f15e 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -150,8 +150,8 @@ public class FateLogger { } @Override - public Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo, - boolean autoCleanUp) { + public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, + Repo<T> repo, boolean autoCleanUp) { var optional = store.seedTransaction(txName, fateKey, repo, autoCleanUp); if (storeLog.isTraceEnabled()) { optional.ifPresentOrElse(fateId -> { @@ -166,7 +166,7 @@ public class FateLogger { } @Override - public boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, + public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo, boolean autoCleanUp) { boolean seeded = store.seedTransaction(txName, fateId, repo, autoCleanUp); if (storeLog.isTraceEnabled()) { diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateService.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateService.java index b38fb423bc..9fd6e7808f 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateService.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateService.java @@ -31,7 +31,7 @@ public class FateService { public TFateId beginFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateInstanceType type) throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException, org.apache.thrift.TException; - public void executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean) throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException, org.apache.accumulo.cor [...] + public void executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean) throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException, org.apache.accumulo.co [...] public java.lang.String waitForFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid) throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException, org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException, org.apache.thrift.TException; @@ -45,7 +45,7 @@ public class FateService { public void beginFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateInstanceType type, org.apache.thrift.async.AsyncMethodCallback<TFateId> resultHandler) throws org.apache.thrift.TException; - public void executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException; + public void executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException; public void waitForFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler) throws org.apache.thrift.TException; @@ -110,13 +110,13 @@ public class FateService { } @Override - public void executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean) throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException, org.apache.accumulo.cor [...] + public void executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean) throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException, org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException, org.apache.accumulo.co [...] { send_executeFateOperation(tinfo, credentials, opid, op, arguments, options, autoClean); recv_executeFateOperation(); } - public void send_executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean) throws org.apache.thrift.TException + public void send_executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean) throws org.apache.thrift.TException { executeFateOperation_args args = new executeFateOperation_args(); args.setTinfo(tinfo); @@ -302,7 +302,7 @@ public class FateService { } @Override - public void executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException { + public void executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException { checkReady(); executeFateOperation_call method_call = new executeFateOperation_call(tinfo, credentials, opid, op, arguments, options, autoClean, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; @@ -313,11 +313,11 @@ public class FateService { private org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo; private org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; private TFateId opid; - private FateOperation op; + private TFateOperation op; private java.util.List<java.nio.ByteBuffer> arguments; private java.util.Map<java.lang.String,java.lang.String> options; private boolean autoClean; - public executeFateOperation_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, [...] + public executeFateOperation_call(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory [...] super(client, protocolFactory, transport, resultHandler, false); this.tinfo = tinfo; this.credentials = credentials; @@ -2312,9 +2312,9 @@ public class FateService { public @org.apache.thrift.annotation.Nullable TFateId opid; // required /** * - * @see FateOperation + * @see TFateOperation */ - public @org.apache.thrift.annotation.Nullable FateOperation op; // required + public @org.apache.thrift.annotation.Nullable TFateOperation op; // required public @org.apache.thrift.annotation.Nullable java.util.List<java.nio.ByteBuffer> arguments; // required public @org.apache.thrift.annotation.Nullable java.util.Map<java.lang.String,java.lang.String> options; // required public boolean autoClean; // required @@ -2326,7 +2326,7 @@ public class FateService { OPID((short)3, "opid"), /** * - * @see FateOperation + * @see TFateOperation */ OP((short)4, "op"), ARGUMENTS((short)5, "arguments"), @@ -2416,7 +2416,7 @@ public class FateService { tmpMap.put(_Fields.OPID, new org.apache.thrift.meta_data.FieldMetaData("opid", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TFateId.class))); tmpMap.put(_Fields.OP, new org.apache.thrift.meta_data.FieldMetaData("op", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, FateOperation.class))); + new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TFateOperation.class))); tmpMap.put(_Fields.ARGUMENTS, new org.apache.thrift.meta_data.FieldMetaData("arguments", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)))); @@ -2437,7 +2437,7 @@ public class FateService { org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId opid, - FateOperation op, + TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments, java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean) @@ -2575,18 +2575,18 @@ public class FateService { /** * - * @see FateOperation + * @see TFateOperation */ @org.apache.thrift.annotation.Nullable - public FateOperation getOp() { + public TFateOperation getOp() { return this.op; } /** * - * @see FateOperation + * @see TFateOperation */ - public executeFateOperation_args setOp(@org.apache.thrift.annotation.Nullable FateOperation op) { + public executeFateOperation_args setOp(@org.apache.thrift.annotation.Nullable TFateOperation op) { this.op = op; return this; } @@ -2737,7 +2737,7 @@ public class FateService { if (value == null) { unsetOp(); } else { - setOp((FateOperation)value); + setOp((TFateOperation)value); } break; @@ -3173,7 +3173,7 @@ public class FateService { break; case 4: // OP if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.op = org.apache.accumulo.core.manager.thrift.FateOperation.findByValue(iprot.readI32()); + struct.op = org.apache.accumulo.core.manager.thrift.TFateOperation.findByValue(iprot.readI32()); struct.setOpIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); @@ -3386,7 +3386,7 @@ public class FateService { struct.setOpidIsSet(true); } if (incoming.get(3)) { - struct.op = org.apache.accumulo.core.manager.thrift.FateOperation.findByValue(iprot.readI32()); + struct.op = org.apache.accumulo.core.manager.thrift.TFateOperation.findByValue(iprot.readI32()); struct.setOpIsSet(true); } if (incoming.get(4)) { diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/TFateOperation.java similarity index 91% rename from core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java rename to core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/TFateOperation.java index d03349ede0..b9cd11ca85 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/TFateOperation.java @@ -25,7 +25,7 @@ package org.apache.accumulo.core.manager.thrift; -public enum FateOperation implements org.apache.thrift.TEnum { +public enum TFateOperation implements org.apache.thrift.TEnum { TABLE_CREATE(0), TABLE_CLONE(1), TABLE_DELETE(2), @@ -34,7 +34,6 @@ public enum FateOperation implements org.apache.thrift.TEnum { TABLE_OFFLINE(5), TABLE_MERGE(6), TABLE_DELETE_RANGE(7), - OBSOLETE_TABLE_BULK_IMPORT(8), TABLE_COMPACT(9), TABLE_IMPORT(10), TABLE_EXPORT(11), @@ -48,7 +47,7 @@ public enum FateOperation implements org.apache.thrift.TEnum { private final int value; - private FateOperation(int value) { + private TFateOperation(int value) { this.value = value; } @@ -65,7 +64,7 @@ public enum FateOperation implements org.apache.thrift.TEnum { * @return null if the value is not found. */ @org.apache.thrift.annotation.Nullable - public static FateOperation findByValue(int value) { + public static TFateOperation findByValue(int value) { switch (value) { case 0: return TABLE_CREATE; @@ -83,8 +82,6 @@ public enum FateOperation implements org.apache.thrift.TEnum { return TABLE_MERGE; case 7: return TABLE_DELETE_RANGE; - case 8: - return OBSOLETE_TABLE_BULK_IMPORT; case 9: return TABLE_COMPACT; case 10: diff --git a/core/src/main/thrift/manager.thrift b/core/src/main/thrift/manager.thrift index 11e2f78353..08d93dc185 100644 --- a/core/src/main/thrift/manager.thrift +++ b/core/src/main/thrift/manager.thrift @@ -49,26 +49,26 @@ enum TabletLoadState { UNLOAD_ERROR } -enum FateOperation { - TABLE_CREATE - TABLE_CLONE - TABLE_DELETE - TABLE_RENAME - TABLE_ONLINE - TABLE_OFFLINE - TABLE_MERGE - TABLE_DELETE_RANGE - OBSOLETE_TABLE_BULK_IMPORT - TABLE_COMPACT - TABLE_IMPORT - TABLE_EXPORT - TABLE_CANCEL_COMPACT - NAMESPACE_CREATE - NAMESPACE_DELETE - NAMESPACE_RENAME - TABLE_BULK_IMPORT2 - TABLE_TABLET_AVAILABILITY - TABLE_SPLIT +enum TFateOperation { + TABLE_CREATE = 0 + TABLE_CLONE = 1 + TABLE_DELETE = 2 + TABLE_RENAME = 3 + TABLE_ONLINE = 4 + TABLE_OFFLINE = 5 + TABLE_MERGE = 6 + TABLE_DELETE_RANGE = 7 + // 8 was bulk v1 that was removed + TABLE_COMPACT = 9 + TABLE_IMPORT = 10 + TABLE_EXPORT = 11 + TABLE_CANCEL_COMPACT = 12 + NAMESPACE_CREATE = 13 + NAMESPACE_DELETE = 14 + NAMESPACE_RENAME = 15 + TABLE_BULK_IMPORT2 = 16 + TABLE_TABLET_AVAILABILITY = 17 + TABLE_SPLIT = 18 } enum ManagerState { @@ -192,7 +192,7 @@ service FateService { 1:client.TInfo tinfo 2:security.TCredentials credentials 3:TFateId opid - 4:FateOperation op + 4:TFateOperation op 5:list<binary> arguments 6:map<string, string> options 7:bool autoClean diff --git a/core/src/test/java/org/apache/accumulo/core/fate/FateOperationTest.java b/core/src/test/java/org/apache/accumulo/core/fate/FateOperationTest.java new file mode 100644 index 0000000000..fd34447703 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/fate/FateOperationTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.accumulo.core.manager.thrift.TFateOperation; +import org.junit.jupiter.api.Test; + +public class FateOperationTest { + + @Test + public void testFateOperation() { + // ensures that all TFateOperation have an equivalent FateOperation + assertTrue(TFateOperation.values().length > 0); + for (var top : TFateOperation.values()) { + assertEquals(top, Fate.FateOperation.fromThrift(top).toThrift()); + } + // ensures that all FateOperation are valid: either specified not to have an equivalent thrift + // form or do have an equivalent thrift form + assertTrue(Fate.FateOperation.values().length > 0); + for (var op : Fate.FateOperation.values()) { + if (Fate.FateOperation.getNonThriftOps().contains(op)) { + assertThrows(IllegalStateException.class, op::toThrift); + } else { + assertEquals(op, Fate.FateOperation.fromThrift(op.toThrift())); + } + } + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 40d0d755b1..1d2389d6fb 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -53,13 +53,13 @@ public class TestStore implements FateStore<String> { } @Override - public Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<String> repo, - boolean autoCleanUp) { + public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, + Repo<String> repo, boolean autoCleanUp) { return Optional.empty(); } @Override - public boolean seedTransaction(String txName, FateId fateId, Repo<String> repo, + public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<String> repo, boolean autoCleanUp) { return false; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java index a89fb39689..0d10b2025f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java @@ -38,7 +38,7 @@ import org.apache.accumulo.core.dataImpl.thrift.IterInfo; import org.apache.accumulo.core.dataImpl.thrift.TColumn; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; -import org.apache.accumulo.core.manager.thrift.FateOperation; +import org.apache.accumulo.core.manager.thrift.TFateOperation; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.SystemPermission; @@ -685,14 +685,14 @@ public class AuditedSecurityOperation extends SecurityOperation { "action: %s; targetTable: %s:%s"; @Override - public boolean canChangeTableState(TCredentials credentials, TableId tableId, FateOperation op, + public boolean canChangeTableState(TCredentials credentials, TableId tableId, TFateOperation op, NamespaceId namespaceId) throws ThriftSecurityException { String tableName = getTableName(tableId); String operation = null; - if (op == FateOperation.TABLE_ONLINE) { + if (op == TFateOperation.TABLE_ONLINE) { operation = "onlineTable"; } - if (op == FateOperation.TABLE_OFFLINE) { + if (op == TFateOperation.TABLE_OFFLINE) { operation = "offlineTable"; } try { diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java index e1fd176bae..493d4570a1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java @@ -42,7 +42,7 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; import org.apache.accumulo.core.fate.zookeeper.ZooCache; -import org.apache.accumulo.core.manager.thrift.FateOperation; +import org.apache.accumulo.core.manager.thrift.TFateOperation; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.NamespacePermission; @@ -524,7 +524,7 @@ public class SecurityOperation { || hasTablePermission(c, tableId, namespaceId, TablePermission.DROP_TABLE, false); } - public boolean canChangeTableState(TCredentials c, TableId tableId, FateOperation op, + public boolean canChangeTableState(TCredentials c, TableId tableId, TFateOperation op, NamespaceId namespaceId) throws ThriftSecurityException { authenticate(c); return hasSystemPermissionWithNamespaceId(c, SystemPermission.SYSTEM, namespaceId, false) diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java index f17695265a..13b252e693 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java @@ -33,6 +33,7 @@ import java.util.TreeMap; import java.util.TreeSet; import org.apache.accumulo.core.fate.AdminUtil; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyFateStore; @@ -87,8 +88,9 @@ public class FateSummaryReport { } String top = txnStatus.getTop(); stepCounts.merge(Objects.requireNonNullElse(top, "?"), 1, Integer::sum); - String runningRepo = txnStatus.getTxName(); - cmdCounts.merge(Objects.requireNonNullElse(runningRepo, "?"), 1, Integer::sum); + Fate.FateOperation runningRepo = txnStatus.getTxName(); + + cmdCounts.merge(runningRepo == null ? "?" : runningRepo.name(), 1, Integer::sum); // filter transactions if provided if (!fateIdFilter.isEmpty() && !fateIdFilter.contains(txnStatus.getFateId().canonical())) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java index ddf5beac8f..8d1218e618 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java @@ -74,7 +74,7 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> { step = txnStatus.getTop(); } if (txnStatus.getTxName() != null) { - txName = txnStatus.getTxName(); + txName = txnStatus.getTxName().name(); } if (txnStatus.getFateId() != null) { fateId = txnStatus.getFateId().canonical(); diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java index fb3d77a706..c8f2573bab 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java @@ -34,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.AdminUtil; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.junit.jupiter.api.Test; @@ -57,7 +58,7 @@ class TxnDetailsTest { expect(status1.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(1)).anyTimes(); expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status1.getTop()).andReturn("step1").anyTimes(); - expect(status1.getTxName()).andReturn("runningTx1").anyTimes(); + expect(status1.getTxName()).andReturn(Fate.FateOperation.TABLE_CREATE).anyTimes(); expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:" + uuid1)).anyTimes(); expect(status1.getHeldLocks()).andReturn(List.of()).anyTimes(); expect(status1.getWaitingLocks()).andReturn(List.of()).anyTimes(); @@ -66,7 +67,7 @@ class TxnDetailsTest { expect(status2.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(7)).anyTimes(); expect(status2.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status2.getTop()).andReturn("step2").anyTimes(); - expect(status2.getTxName()).andReturn("runningTx2").anyTimes(); + expect(status2.getTxName()).andReturn(Fate.FateOperation.TABLE_DELETE).anyTimes(); expect(status2.getFateId()).andReturn(FateId.from("FATE:USER:" + uuid2)).anyTimes(); expect(status2.getHeldLocks()).andReturn(List.of()).anyTimes(); expect(status2.getWaitingLocks()).andReturn(List.of()).anyTimes(); @@ -100,7 +101,7 @@ class TxnDetailsTest { expect(status1.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(1)).anyTimes(); expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status1.getTop()).andReturn("step1").anyTimes(); - expect(status1.getTxName()).andReturn("runningTx").anyTimes(); + expect(status1.getTxName()).andReturn(Fate.FateOperation.TABLE_COMPACT).anyTimes(); expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:" + UUID.randomUUID())).anyTimes(); // incomplete lock info (W unknown ns id, no table)) expect(status1.getHeldLocks()).andReturn(List.of("R:1", "R:2", "W:a")).anyTimes(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index 32303ade9d..de1bae81a8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -69,15 +69,16 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.BulkImportState; -import org.apache.accumulo.core.manager.thrift.FateOperation; import org.apache.accumulo.core.manager.thrift.FateService; import org.apache.accumulo.core.manager.thrift.TFateId; import org.apache.accumulo.core.manager.thrift.TFateInstanceType; +import org.apache.accumulo.core.manager.thrift.TFateOperation; import org.apache.accumulo.core.manager.thrift.ThriftPropertyException; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.util.ByteBufferUtil; @@ -131,10 +132,11 @@ class FateServiceHandler implements FateService.Iface { } @Override - public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, FateOperation op, + public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, TFateOperation top, List<ByteBuffer> arguments, Map<String,String> options, boolean autoCleanup) throws ThriftSecurityException, ThriftTableOperationException, ThriftPropertyException { authenticate(c); + Fate.FateOperation op = Fate.FateOperation.fromThrift(top); String goalMessage = op.toString() + " "; String txUUIDStr = opid.getTxUUIDStr(); FateInstanceType type = FateInstanceType.fromThrift(opid.getType()); @@ -151,7 +153,7 @@ class FateServiceHandler implements FateService.Iface { } goalMessage += "Create " + namespace + " namespace."; - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new CreateNamespace(c.getPrincipal(), namespace, options)), autoCleanup, goalMessage); break; @@ -170,7 +172,7 @@ class FateServiceHandler implements FateService.Iface { } goalMessage += "Rename " + oldName + " namespace to " + newName; - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new RenameNamespace(namespaceId, oldName, newName)), autoCleanup, goalMessage); break; @@ -188,7 +190,7 @@ class FateServiceHandler implements FateService.Iface { } goalMessage += "Delete namespace Id: " + namespaceId; - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new DeleteNamespace(namespaceId)), autoCleanup, goalMessage); break; } @@ -251,7 +253,7 @@ class FateServiceHandler implements FateService.Iface { goalMessage += "Create table " + tableName + " " + initialTableState + " with " + splitCount + " splits and initial tabletAvailability of " + initialTabletAvailability; - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, timeType, options, splitsPath, splitCount, splitsDirsPath, initialTableState, initialTabletAvailability, namespaceId)), @@ -287,7 +289,7 @@ class FateServiceHandler implements FateService.Iface { goalMessage += "Rename table " + oldTableName + "(" + tableId + ") to " + oldTableName; try { - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new RenameTable(namespaceId, tableId, oldTableName, newTableName)), autoCleanup, goalMessage); } catch (NamespaceNotFoundException e) { @@ -359,8 +361,8 @@ class FateServiceHandler implements FateService.Iface { } manager.fate(type).seedTransaction( - op.toString(), fateId, new TraceRepo<>(new CloneTable(c.getPrincipal(), namespaceId, - srcTableId, tableName, propertiesToSet, propertiesToExclude, keepOffline)), + op, fateId, new TraceRepo<>(new CloneTable(c.getPrincipal(), namespaceId, srcTableId, + tableName, propertiesToSet, propertiesToExclude, keepOffline)), autoCleanup, goalMessage); break; @@ -388,7 +390,7 @@ class FateServiceHandler implements FateService.Iface { } goalMessage += "Delete table " + tableName + "(" + tableId + ")"; - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new PreDeleteTable(namespaceId, tableId)), autoCleanup, goalMessage); break; } @@ -400,7 +402,8 @@ class FateServiceHandler implements FateService.Iface { final boolean canOnlineOfflineTable; try { - canOnlineOfflineTable = manager.security.canChangeTableState(c, tableId, op, namespaceId); + canOnlineOfflineTable = + manager.security.canChangeTableState(c, tableId, top, namespaceId); } catch (ThriftSecurityException e) { throwIfTableMissingSecurityException(e, tableId, null, TableOperation.ONLINE); throw e; @@ -413,7 +416,7 @@ class FateServiceHandler implements FateService.Iface { goalMessage += "Online table " + tableId; final EnumSet<TableState> expectedCurrStates = EnumSet.of(TableState.ONLINE, TableState.OFFLINE); - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>( new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)), autoCleanup, goalMessage); @@ -428,7 +431,8 @@ class FateServiceHandler implements FateService.Iface { final boolean canOnlineOfflineTable; try { - canOnlineOfflineTable = manager.security.canChangeTableState(c, tableId, op, namespaceId); + canOnlineOfflineTable = + manager.security.canChangeTableState(c, tableId, top, namespaceId); } catch (ThriftSecurityException e) { throwIfTableMissingSecurityException(e, tableId, null, TableOperation.OFFLINE); throw e; @@ -441,7 +445,7 @@ class FateServiceHandler implements FateService.Iface { goalMessage += "Offline table " + tableId; final EnumSet<TableState> expectedCurrStates = EnumSet.of(TableState.ONLINE, TableState.OFFLINE); - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>( new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)), autoCleanup, goalMessage); @@ -477,7 +481,7 @@ class FateServiceHandler implements FateService.Iface { startRowStr, endRowStr); goalMessage += "Merge table " + tableName + "(" + tableId + ") splits from " + startRowStr + " to " + endRowStr; - manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>( new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId, startRow, endRow)), autoCleanup, goalMessage); break; @@ -509,7 +513,7 @@ class FateServiceHandler implements FateService.Iface { goalMessage += "Delete table " + tableName + "(" + tableId + ") range " + startRow + " to " + endRow; - manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>( new TableRangeOp(MergeInfo.Operation.DELETE, namespaceId, tableId, startRow, endRow)), autoCleanup, goalMessage); break; @@ -535,7 +539,7 @@ class FateServiceHandler implements FateService.Iface { } goalMessage += "Compact table (" + tableId + ") with config " + compactionConfig; - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new CompactRange(namespaceId, tableId, compactionConfig)), autoCleanup, goalMessage); break; @@ -559,7 +563,7 @@ class FateServiceHandler implements FateService.Iface { } goalMessage += "Cancel compaction of table (" + tableId + ")"; - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new CancelCompactions(namespaceId, tableId)), autoCleanup, goalMessage); break; } @@ -600,10 +604,10 @@ class FateServiceHandler implements FateService.Iface { } goalMessage += "Import table with new name: " + tableName + " from " + exportDirs; - manager.fate(type).seedTransaction(op.toString(), fateId, - new TraceRepo<>(new ImportTable(c.getPrincipal(), tableName, exportDirs, namespaceId, - keepMappings, keepOffline)), - autoCleanup, goalMessage); + manager.fate(type) + .seedTransaction(op, fateId, new TraceRepo<>(new ImportTable(c.getPrincipal(), + tableName, exportDirs, namespaceId, keepMappings, keepOffline)), autoCleanup, + goalMessage); break; } case TABLE_EXPORT: { @@ -630,7 +634,7 @@ class FateServiceHandler implements FateService.Iface { } goalMessage += "Export table " + tableName + "(" + tableId + ") to " + exportDir; - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new ExportTable(namespaceId, tableName, tableId, exportDir)), autoCleanup, goalMessage); break; @@ -667,7 +671,7 @@ class FateServiceHandler implements FateService.Iface { manager.updateBulkImportStatus(dir, BulkImportState.INITIAL); goalMessage += "Bulk import (v2) " + dir + " to " + tableName + "(" + tableId + ")"; - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new PrepBulkImport(tableId, dir, setTime)), autoCleanup, goalMessage); break; } @@ -710,7 +714,7 @@ class FateServiceHandler implements FateService.Iface { goalMessage += "Set availability for table: " + tableName + "(" + tableId + ") range: " + tRange + " to: " + tabletAvailability.name(); - manager.fate(type).seedTransaction(op.toString(), fateId, + manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(new LockTable(tableId, namespaceId, tRange, tabletAvailability)), autoCleanup, goalMessage); break; @@ -781,8 +785,8 @@ class FateServiceHandler implements FateService.Iface { } goalMessage = "Splitting " + extent + " for user into " + (splits.size() + 1) + " tablets"; - manager.fate(type).seedTransaction(op.toString(), fateId, new PreSplit(extent, splits), - autoCleanup, goalMessage); + manager.fate(type).seedTransaction(op, fateId, new PreSplit(extent, splits), autoCleanup, + goalMessage); break; } default: diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index a669798e8b..63137097a2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -321,7 +321,7 @@ public class ManagerClientServiceHandler implements ManagerClientService.Iface { String msg = "Shutdown tserver " + tabletServer; - fate.seedTransaction("ShutdownTServer", fateId, + fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, fateId, new TraceRepo<>( new ShutdownTServer(doomed, manager.tserverSet.getResourceGroup(doomed), force)), false, msg); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index fce920e4cb..e3bec68c82 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -839,8 +839,8 @@ public class CompactionCoordinator // Start a fate transaction to commit the compaction. CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid); var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, extent, ecm, stats)); - localFate.seedTransaction("COMMIT_COMPACTION", FateKey.forCompactionCommit(ecid), renameOp, - true); + localFate.seedTransaction(Fate.FateOperation.COMMIT_COMPACTION, + FateKey.forCompactionCommit(ecid), renameOp, true); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java index 19ba624306..473f1284a5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.TreeMap; import org.apache.accumulo.core.fate.AdminUtil; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; @@ -101,11 +102,9 @@ public abstract class FateMetricValues { // incr count for op type for for in_progress transactions. if (ReadOnlyFateStore.TStatus.IN_PROGRESS.equals(tx.getStatus())) { - String opType = tx.getTxName(); - if (opType == null || opType.isEmpty()) { - opType = "UNKNOWN"; - } - opTypeCounters.merge(opType, 1L, Long::sum); + Fate.FateOperation opType = tx.getTxName(); + String opTypeStr = opType == null ? "UNKNOWN" : opType.name(); + opTypeCounters.merge(opTypeStr, 1L, Long::sum); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java index 7b56ea4388..8270bc423f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java @@ -19,6 +19,7 @@ package org.apache.accumulo.manager.split; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.manager.Manager; @@ -41,8 +42,8 @@ public class SeedSplitTask implements Runnable { public void run() { try { var fateInstanceType = FateInstanceType.fromTableId((extent.tableId())); - manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", FateKey.forSplit(extent), - new FindSplits(extent), true); + manager.fate(fateInstanceType).seedTransaction(Fate.FateOperation.SYSTEM_SPLIT, + FateKey.forSplit(extent), new FindSplits(extent), true); } catch (Exception e) { log.error("Failed to split {}", extent, e); } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index c8905cd850..3d6a0e3dba 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -74,6 +74,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; @@ -363,7 +364,7 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { // should never run. Its purpose is to prevent the dead compaction detector // from deleting the id. Repo<Manager> repo = new FakeRepo(); - var fateId = fateStore.seedTransaction("COMPACTION_COMMIT", + var fateId = fateStore.seedTransaction(Fate.FateOperation.COMMIT_COMPACTION, FateKey.forCompactionCommit(allCids.get(tableId).get(0)), repo, true).orElseThrow(); // Read the tablet metadata diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index f493292368..e7b3e073c9 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -25,6 +25,7 @@ import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRES import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; +import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -244,7 +245,7 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu FateId fateId = fate.startTransaction(); assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); - fate.seedTransaction("TestOperation", fateId, new TestRepo("testTransactionStatus"), true, + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testTransactionStatus"), true, "Test Op"); assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); // wait for call() to be called @@ -306,7 +307,7 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu assertTrue(fate.cancel(fateId)); assertTrue( FAILED_IN_PROGRESS == getTxStatus(sctx, fateId) || FAILED == getTxStatus(sctx, fateId)); - fate.seedTransaction("TestOperation", fateId, new TestRepo("testCancelWhileNew"), true, + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testCancelWhileNew"), true, "Test Op"); Wait.waitFor(() -> FAILED == getTxStatus(sctx, fateId)); // nothing should have run @@ -337,8 +338,8 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu FateId fateId = fate.startTransaction(); LOG.debug("Starting test testCancelWhileSubmitted with {}", fateId); assertEquals(NEW, getTxStatus(sctx, fateId)); - fate.seedTransaction("TestOperation", fateId, - new TestRepo("testCancelWhileSubmittedAndRunning"), false, "Test Op"); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testCancelWhileSubmittedAndRunning"), + false, "Test Op"); Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, fateId)); // This is false because the transaction runner has reserved the FaTe // transaction. @@ -372,7 +373,7 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu FateId fateId = fate.startTransaction(); LOG.debug("Starting test testCancelWhileInCall with {}", fateId); assertEquals(NEW, getTxStatus(sctx, fateId)); - fate.seedTransaction("TestOperation", fateId, new TestRepo("testCancelWhileInCall"), true, + fate.seedTransaction(TEST_FATE_OP, fateId, new TestRepo("testCancelWhileInCall"), true, "Test Op"); assertEquals(SUBMITTED, getTxStatus(sctx, fateId)); // wait for call() to be called @@ -488,8 +489,8 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); FateId fateId = fate.startTransaction(); assertEquals(NEW, getTxStatus(sctx, fateId)); - fate.seedTransaction("TestOperationFails", fateId, - new TestOperationFails(1, ExceptionLocation.CALL), false, "Test Op Fails"); + fate.seedTransaction(TEST_FATE_OP, fateId, new TestOperationFails(1, ExceptionLocation.CALL), + false, "Test Op Fails"); // Wait for all the undo() calls to complete undoLatch.await(); assertEquals(expectedUndoOrder, TestOperationFails.undoOrder); @@ -502,7 +503,7 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS); fateId = fate.startTransaction(); assertEquals(NEW, getTxStatus(sctx, fateId)); - fate.seedTransaction("TestOperationFails", fateId, + fate.seedTransaction(TEST_FATE_OP, fateId, new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test Op Fails"); // Wait for all the undo() calls to complete undoLatch.await(); @@ -546,8 +547,8 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu FateId fateId = fate.startTransaction(); transactions.add(fateId); assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); - fate.seedTransaction("TestOperation", fateId, new DeferredTestRepo("testDeferredOverflow"), - true, "Test Op"); + fate.seedTransaction(TEST_FATE_OP, fateId, new DeferredTestRepo("testDeferredOverflow"), true, + "Test Op"); assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java index 1980dcf4ee..d029ebb489 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.fate; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -298,8 +299,8 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT FateKey fateKey2 = FateKey.forCompactionCommit(ExternalCompactionId.generate(UUID.randomUUID())); - var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(), true).orElseThrow(); - var fateId2 = store.seedTransaction("TEST", fateKey2, new TestRepo(), true).orElseThrow(); + var fateId1 = store.seedTransaction(TEST_FATE_OP, fateKey1, new TestRepo(), true).orElseThrow(); + var fateId2 = store.seedTransaction(TEST_FATE_OP, fateKey2, new TestRepo(), true).orElseThrow(); assertNotEquals(fateId1, fateId2); @@ -333,11 +334,11 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); FateKey fateKey = FateKey.forSplit(ke); - var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); + var fateId = store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true).orElseThrow(); // second call is empty - assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(), true).isEmpty()); - assertFalse(store.seedTransaction("TEST", fateId, new TestRepo(), true)); + assertTrue(store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true).isEmpty()); + assertFalse(store.seedTransaction(TEST_FATE_OP, fateId, new TestRepo(), true)); var txStore = store.reserve(fateId); try { @@ -362,7 +363,7 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); FateKey fateKey = FateKey.forSplit(ke); - var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); + var fateId = store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true).orElseThrow(); var txStore = store.reserve(fateId); try { @@ -371,7 +372,7 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT // We have an existing transaction with the same key in progress // so should return an empty Optional - assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(), true).isEmpty()); + assertTrue(store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true).isEmpty()); assertEquals(TStatus.IN_PROGRESS, txStore.getStatus()); } finally { txStore.setStatus(TStatus.SUCCESSFUL); @@ -383,7 +384,8 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT try { // After deletion, make sure we can create again with the same key - var fateId2 = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); + var fateId2 = + store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true).orElseThrow(); txStore = store.reserve(fateId); assertEquals(fateId, fateId2); assertTrue(txStore.timeCreated() > 0); @@ -424,10 +426,10 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT FateKey fateKey1 = FateKey.forSplit(ke1); FateKey fateKey2 = FateKey.forSplit(ke2); - var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(), true).orElseThrow(); + var fateId1 = store.seedTransaction(TEST_FATE_OP, fateKey1, new TestRepo(), true).orElseThrow(); var txStore = store.reserve(fateId1); try { - assertTrue(store.seedTransaction("TEST", fateKey2, new TestRepo(), true).isEmpty()); + assertTrue(store.seedTransaction(TEST_FATE_OP, fateKey2, new TestRepo(), true).isEmpty()); assertEquals(fateKey1, txStore.getKey().orElseThrow()); } finally { txStore.delete(); @@ -447,14 +449,14 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); FateKey fateKey = FateKey.forSplit(ke); - var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); + var fateId = store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true).orElseThrow(); // After seeding a fate transaction using a key we can simulate a collision with // a random FateId by deleting the key out of Fate and calling seed again to // verify it detects the key is missing. Then we can continue and see if we can still use // the existing transaction. deleteKey(fateId, sctx); - assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(), true).isEmpty()); + assertTrue(store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true).isEmpty()); var txStore = store.reserve(fateId); // We should still be able to use the existing transaction @@ -600,8 +602,8 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT // have 10 threads all try to seed the same fate key, only one should succeed. List<Future<Optional<FateId>>> futures = new ArrayList<>(10); for (int i = 0; i < 10; i++) { - futures.add( - executor.submit(() -> store.seedTransaction("TEST", fateKey, new TestRepo(), true))); + futures.add(executor + .submit(() -> store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true))); } int idsSeen = 0; @@ -683,7 +685,7 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT Map<FateKey,FateId> fateKeyIds = new HashMap<>(); for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) { - var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); + var fateId = store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(), true).orElseThrow(); fateKeyIds.put(fateKey, fateId); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java index 1f96bd9b94..83bb8a3e5b 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TabletInformation; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; @@ -39,9 +40,14 @@ import org.junit.jupiter.api.io.TempDir; import com.google.common.collect.MoreCollectors; /** - * A class with utility methods for testing UserFateStore and MetaFateStore + * A class with utilities for testing {@link org.apache.accumulo.core.fate.user.UserFateStore} and + * {@link org.apache.accumulo.core.fate.zookeeper.MetaFateStore} */ public class FateStoreUtil { + // A FateOperation for testing purposes when a FateOperation is needed but whose value doesn't + // matter + public static final Fate.FateOperation TEST_FATE_OP = Fate.FateOperation.TABLE_CREATE; + /** * Create the fate table with the exact configuration as the real Fate user instance table * including table properties and TabletAvailability. For use in testing UserFateStore diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index 292f1fdfb4..9dd6577018 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate; +import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -253,10 +254,10 @@ public abstract class MultipleStoresIT extends SharedMiniClusterBase { // Start half the txns using fate1, and the other half using fate2 if (i % 2 == 0) { fateId = fate1.startTransaction(); - fate1.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, "test"); + fate1.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), true, "test"); } else { fateId = fate2.startTransaction(); - fate2.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, "test"); + fate2.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(), true, "test"); } allIds.add(fateId); } @@ -311,7 +312,7 @@ public abstract class MultipleStoresIT extends SharedMiniClusterBase { for (int i = 0; i < numFateIds; i++) { FateId fateId; fateId = fate1.startTransaction(); - fate1.seedTransaction("op" + i, fateId, new LatchTestRepo(), true, "test"); + fate1.seedTransaction(TEST_FATE_OP, fateId, new LatchTestRepo(), true, "test"); allIds.add(fateId); } assertEquals(numFateIds, allIds.size()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 5e5775110f..f74944692e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -50,6 +50,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.AdminUtil; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; @@ -421,10 +422,10 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { log.trace("Fate id: {}, status: {}", tx.getFateId(), tx.getStatus()); String top = tx.getTop(); - String txName = tx.getTxName(); + Fate.FateOperation txName = tx.getTxName(); return top != null && txName != null && top.contains("CompactionDriver") - && tx.getTxName().equals("TABLE_COMPACT"); + && txName == Fate.FateOperation.TABLE_COMPACT; } /**