This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new f9d8afebba Several misc Fate changes (#4912)
f9d8afebba is described below

commit f9d8afebbad00d2db476e23b8bedff839441b2c1
Author: Kevin Rathbun <krath...@apache.org>
AuthorDate: Thu Oct 3 10:04:41 2024 -0400

    Several misc Fate changes (#4912)
    
    - Add a toString() to FateKey
    - Move MetaFateStore to org.apache.accumulo.core.fate.zookeeper
    - Periodic clean up of dead reservations increased from every 30 seconds to 
every few minutes
    - New fate test case added to FateIT that ensures no write ops can be 
performed on a transaction after it has been deleted
            - Added new check to verifyReserved() that checks whether the 
transaction is deleted
            - Fixed UserFateStoreIT to work with new change and misc cleanup to 
the class
    - created new class FastFate which performs the dead reservation cleanup
      more often (used in testing)
---
 .../accumulo/core/fate/AbstractFateStore.java      | 14 +++----
 .../java/org/apache/accumulo/core/fate/Fate.java   |  8 +++-
 .../org/apache/accumulo/core/fate/FateKey.java     | 11 +++++
 .../accumulo/core/fate/user/UserFateStore.java     | 18 ++++-----
 .../core/fate/{ => zookeeper}/MetaFateStore.java   | 30 ++++++++------
 .../org/apache/accumulo/server/util/Admin.java     |  2 +-
 .../java/org/apache/accumulo/manager/Manager.java  |  2 +-
 .../manager/metrics/fate/meta/MetaFateMetrics.java |  2 +-
 .../test/compaction/ExternalCompaction_1_IT.java   |  2 +-
 .../org/apache/accumulo/test/fate/FastFate.java    | 43 ++++++++++++++++++++
 .../java/org/apache/accumulo/test/fate/FateIT.java | 32 +++++++++++++++
 .../accumulo/test/fate/FateOpsCommandsIT.java      |  2 +-
 .../accumulo/test/fate/MultipleStoresIT.java       | 24 +++++------
 .../apache/accumulo/test/fate/meta/MetaFateIT.java |  2 +-
 .../test/fate/meta/MetaFateInterleavingIT.java     |  2 +-
 .../test/fate/meta/MetaFateOpsCommandsIT.java      |  2 +-
 .../test/fate/meta/MetaFateStoreFateIT.java        |  2 +-
 .../accumulo/test/fate/user/UserFateStoreIT.java   | 47 +++++++---------------
 .../test/functional/FateConcurrencyIT.java         |  2 +-
 .../test/functional/FunctionalTestUtils.java       |  2 +-
 20 files changed, 162 insertions(+), 87 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 97b391e218..ff5e45d310 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -310,7 +310,7 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
     public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
       Preconditions.checkState(!isReserved(),
           "Attempted to wait for status change while reserved: " + fateId);
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
 
       int currNumCallers = concurrentStatusChangeCallers.incrementAndGet();
 
@@ -375,16 +375,14 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
 
     protected abstract void unreserve();
 
-    protected void verifyReserved(boolean isWrite) {
-      if (!isReserved() && isWrite) {
-        throw new IllegalStateException(
-            "Attempted write on unreserved FATE transaction: " + fateId);
-      }
+    protected void verifyReservedAndNotDeleted(boolean isWrite) {
+      Preconditions.checkState(!isWrite || (isReserved() && !deleted),
+          "Attempted write on unreserved or deleted FATE transaction: " + 
fateId);
     }
 
     @Override
     public TStatus getStatus() {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
       var status = _getStatus(fateId);
       observedStatus = status;
       return status;
@@ -392,7 +390,7 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
 
     @Override
     public Optional<FateKey> getKey() {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
       return AbstractFateStore.this.getKey(fateId);
     }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index e2d4e7cbe5..b6860c557d 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -383,8 +383,8 @@ public class Fate<T> {
       // reservations held by dead processes, if they exist.
       deadResCleanerExecutor = 
ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
           store.type() + "-dead-reservation-cleaner-pool");
-      ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor
-          .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30, 
SECONDS);
+      ScheduledFuture<?> deadReservationCleaner = 
deadResCleanerExecutor.scheduleWithFixedDelay(
+          new DeadReservationCleaner(), 3, 
getDeadResCleanupDelay().toSeconds(), SECONDS);
       ThreadPools.watchCriticalScheduledTask(deadReservationCleaner);
     }
     this.deadResCleanerExecutor = deadResCleanerExecutor;
@@ -393,6 +393,10 @@ public class Fate<T> {
     this.workFinder.start();
   }
 
+  public Duration getDeadResCleanupDelay() {
+    return Duration.ofMinutes(3);
+  }
+
   // get a transaction id back to the requester before doing any work
   public FateId startTransaction() {
     return store.create();
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
index 6c1663627c..8942149a6f 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
@@ -28,6 +28,8 @@ import java.util.Optional;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.hadoop.io.DataInputBuffer;
 
 public class FateKey {
@@ -168,4 +170,13 @@ public class FateKey {
         throw new IllegalStateException("Unexpected FateInstanceType found " + 
type);
     }
   }
+
+  @Override
+  public String toString() {
+    var buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
+    buf.append("FateKeyType", type);
+    keyExtent.ifPresentOrElse(keyExtent -> buf.append("KeyExtent", keyExtent),
+        () -> buf.append("ExternalCompactionID", compactionId.orElseThrow()));
+    return buf.toString();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index f1f82758ff..e1cb4d6405 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -420,7 +420,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public Repo<T> top() {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
 
       return scanTx(scanner -> {
         scanner.setRange(getRow(fateId));
@@ -436,7 +436,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public List<ReadOnlyRepo<T>> getStack() {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
 
       return scanTx(scanner -> {
         scanner.setRange(getRow(fateId));
@@ -451,7 +451,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public Serializable getTransactionInfo(TxInfo txInfo) {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
 
       try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
         scanner.setRange(getRow(fateId));
@@ -487,7 +487,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public long timeCreated() {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
 
       return scanTx(scanner -> {
         scanner.setRange(getRow(fateId));
@@ -499,7 +499,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void push(Repo<T> repo) throws StackOverflowException {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       Optional<Integer> top = findTop();
 
@@ -514,7 +514,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void pop() {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       Optional<Integer> top = findTop();
       top.ifPresent(t -> newMutator(fateId)
@@ -523,7 +523,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void setStatus(TStatus status) {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       newMutator(fateId).putStatus(status).mutate();
       observedStatus = status;
@@ -531,7 +531,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void setTransactionInfo(TxInfo txInfo, Serializable so) {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       final byte[] serialized = serializeTxInfo(so);
 
@@ -540,7 +540,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void delete() {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       var mutator = newMutator(fateId);
       mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, 
TStatus.SUCCESSFUL, TStatus.FAILED);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
similarity index 96%
rename from core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java
rename to 
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
index 08247e7441..d19db17004 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.fate;
+package org.apache.accumulo.core.fate.zookeeper;
 
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -39,9 +39,15 @@ import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.Fate.TxInfo;
-import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateKey;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.StackOverflowException;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -239,7 +245,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public Repo<T> top() {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
 
       for (int i = 0; i < RETRIES; i++) {
         String txpath = getTXPath(fateId);
@@ -291,7 +297,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void push(Repo<T> repo) throws StackOverflowException {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       String txpath = getTXPath(fateId);
       try {
@@ -310,7 +316,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void pop() {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       try {
         String txpath = getTXPath(fateId);
@@ -326,7 +332,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void setStatus(TStatus status) {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       try {
         zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
@@ -353,7 +359,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void delete() {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       try {
         zk.recursiveDelete(getTXPath(fateId), NodeMissingPolicy.SKIP);
@@ -365,7 +371,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) {
-      verifyReserved(true);
+      verifyReservedAndNotDeleted(true);
 
       try {
         zk.putPersistentData(getTXPath(fateId) + "/" + txInfo, 
serializeTxInfo(so),
@@ -377,14 +383,14 @@ public class MetaFateStore<T> extends 
AbstractFateStore<T> {
 
     @Override
     public Serializable getTransactionInfo(Fate.TxInfo txInfo) {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
 
       return MetaFateStore.this.getTransactionInfo(txInfo, fateId);
     }
 
     @Override
     public long timeCreated() {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
 
       try {
         Stat stat = zk.getZooKeeper().exists(getTXPath(fateId), false);
@@ -396,7 +402,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
 
     @Override
     public List<ReadOnlyRepo<T>> getStack() {
-      verifyReserved(false);
+      verifyReservedAndNotDeleted(false);
       String txpath = getTXPath(fateId);
 
       outer: while (true) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java 
b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index f2aa438661..5e26567ac4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -71,9 +71,9 @@ import org.apache.accumulo.core.fate.AdminUtil;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.lock.ServiceLock;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0af05eb5b7..ae75437225 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -76,8 +76,8 @@ import org.apache.accumulo.core.fate.FateCleaner;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
index 1087cf1b9b..02aa3a28f4 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java
@@ -26,8 +26,8 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.fate.AbstractFateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.manager.metrics.fate.FateMetrics;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.zookeeper.KeeperException;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 314212693a..e8955e465a 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -78,8 +78,8 @@ import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.iterators.DevNull;
 import org.apache.accumulo.core.iterators.Filter;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
new file mode 100644
index 0000000000..71b198c0ac
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.time.Duration;
+import java.util.function.Function;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.Repo;
+
+/**
+ * A FATE which performs the dead reservation cleanup with a much shorter 
delay between
+ */
+public class FastFate<T> extends Fate<T> {
+
+  public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner,
+      Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
+    super(environment, store, runDeadResCleaner, toLogStrFunc, conf);
+  }
+
+  @Override
+  public Duration getDeadResCleanupDelay() {
+    return Duration.ofSeconds(15);
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index bd7c4a2395..d36e98bdec 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -24,8 +24,10 @@ import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRES
 import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
 import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
 import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
@@ -43,6 +45,7 @@ import org.apache.accumulo.core.fate.AbstractFateStore;
 import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -479,6 +482,35 @@ public abstract class FateIT extends SharedMiniClusterBase 
implements FateTestRu
     }
   }
 
+  @Test
+  @Timeout(30)
+  public void testNoWriteAfterDelete() throws Exception {
+    executeTest(this::testNoWriteAfterDelete);
+  }
+
+  protected void testNoWriteAfterDelete(FateStore<TestEnv> store, 
ServerContext sctx)
+      throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final FateId fateId = store.create();
+    final Repo<TestEnv> repo = new TestRepo("testNoWriteAfterDelete");
+
+    var txStore = store.reserve(fateId);
+
+    // all write ops should be ok after reservation
+    assertDoesNotThrow(() -> txStore.push(repo));
+    assertDoesNotThrow(() -> 
txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
+    assertDoesNotThrow(txStore::pop);
+    assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.TX_NAME, 
"name"));
+    assertDoesNotThrow(txStore::delete);
+
+    // test that all write ops result in an exception since the tx has been 
deleted
+    assertThrows(Exception.class, () -> txStore.push(repo));
+    assertThrows(Exception.class, () -> 
txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
+    assertThrows(Exception.class, txStore::pop);
+    assertThrows(Exception.class, () -> 
txStore.setTransactionInfo(Fate.TxInfo.TX_NAME, "name"));
+    assertThrows(Exception.class, txStore::delete);
+  }
+
   private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx, 
Set<FateId> transactions) {
     FateId fateId = fate.startTransaction();
     transactions.add(fateId);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
index 8b52f88f97..0db83c044a 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java
@@ -59,9 +59,9 @@ import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
index 57070bacde..f5e537394d 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
@@ -46,10 +46,10 @@ import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -111,7 +111,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
     final int numFateIds = 500;
     final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
     final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
-    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final boolean isUserStore = storeType == FateInstanceType.USER;
     final Set<FateId> allIds = new HashSet<>();
     final FateStore<SleepingTestEnv> store1, store2;
     final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
@@ -182,7 +182,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
     // Tests that reserve() doesn't hang indefinitely and instead throws an 
error
     // on reserve() a non-existent transaction.
     final FateStore<SleepingTestEnv> store;
-    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final boolean isUserStore = storeType == FateInstanceType.USER;
     final String tableName = getUniqueNames(1)[0];
     final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
     final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
@@ -208,7 +208,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
       throws Exception {
     final String tableName = getUniqueNames(1)[0];
     final int numFateIds = 500;
-    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final boolean isUserStore = storeType == FateInstanceType.USER;
     final Set<FateId> allIds = new HashSet<>();
     final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
     final FateStore<SleepingTestEnv> store;
@@ -256,7 +256,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
       throws Exception {
     final String tableName = getUniqueNames(1)[0];
     final int numFateIds = 500;
-    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final boolean isUserStore = storeType == FateInstanceType.USER;
     final Set<FateId> allIds = new HashSet<>();
     final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
     final FateStore<SleepingTestEnv> store;
@@ -312,7 +312,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
   private void testMultipleFateInstances(FateInstanceType storeType) throws 
Exception {
     final String tableName = getUniqueNames(1)[0];
     final int numFateIds = 500;
-    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final boolean isUserStore = storeType == FateInstanceType.USER;
     final Set<FateId> allIds = new HashSet<>();
     final FateStore<SleepingTestEnv> store1, store2;
     final SleepingTestEnv testEnv1 = new SleepingTestEnv(50);
@@ -380,7 +380,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
     // One transaction for each FATE worker thread
     final int numFateIds =
         
Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue());
-    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final boolean isUserStore = storeType == FateInstanceType.USER;
     final Set<FateId> allIds = new HashSet<>();
     final FateStore<LatchTestEnv> store1, store2;
     final LatchTestEnv testEnv1 = new LatchTestEnv();
@@ -399,8 +399,8 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
     }
     liveLocks.add(lock1);
 
-    Fate<LatchTestEnv> fate1 =
-        new Fate<>(testEnv1, store1, true, Object::toString, 
DefaultConfiguration.getInstance());
+    FastFate<LatchTestEnv> fate1 = new FastFate<>(testEnv1, store1, true, 
Object::toString,
+        DefaultConfiguration.getInstance());
 
     // Ensure nothing is reserved yet
     assertTrue(store1.getActiveReservations().isEmpty());
@@ -445,8 +445,8 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
 
     // Create the new Fate/start the Fate threads (the work finder and the 
workers).
     // Don't run another dead reservation cleaner since we already have one 
running from fate1.
-    Fate<LatchTestEnv> fate2 =
-        new Fate<>(testEnv2, store2, false, Object::toString, 
DefaultConfiguration.getInstance());
+    FastFate<LatchTestEnv> fate2 = new FastFate<>(testEnv2, store2, false, 
Object::toString,
+        DefaultConfiguration.getInstance());
 
     // Wait for the "dead" reservations to be deleted and picked up again 
(reserved using
     // fate2/store2/lock2 now).
@@ -458,7 +458,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase 
{
       boolean allReservedWithLock2 = store2Reservations.values().stream()
           .allMatch(entry -> 
FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2));
       return store2Reservations.keySet().equals(allIds) && 
allReservedWithLock2;
-    }, 60_000);
+    }, fate1.getDeadResCleanupDelay().toMillis() * 2);
 
     // Finish work and shutdown
     testEnv1.workersLatch.countDown();
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
index a23dde0644..c5f541b5e9 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
@@ -32,8 +32,8 @@ import java.util.UUID;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
 import org.apache.accumulo.core.fate.FateId;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.fate.FateIT;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
index bfd267630f..d306e0bfef 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
@@ -24,7 +24,7 @@ import java.util.UUID;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.fate.AbstractFateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.fate.FateInterleavingIT;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
index 994c7af2eb..c4c1e5b24a 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
@@ -22,7 +22,7 @@ import static 
org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.fate.AbstractFateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.fate.FateOpsCommandsIT;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
index beb48a5304..af8b98db0f 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
@@ -36,8 +36,8 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateStore;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.ServerContext;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
index 55f89cd605..c82662182c 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java
@@ -24,9 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -40,6 +37,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.AbstractFateStore;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateStore;
@@ -76,29 +74,6 @@ public class UserFateStoreIT extends SharedMiniClusterBase {
     SharedMiniClusterBase.stopMiniCluster();
   }
 
-  private static class TestUserFateStore extends UserFateStore<TestEnv> {
-    private final Iterator<FateId> fateIdIterator;
-
-    // use the list of fateIds to simulate collisions on fateIds
-    public TestUserFateStore(ClientContext context, String tableName, 
List<FateId> fateIds) {
-      super(context, tableName, createDummyLockID(), null);
-      this.fateIdIterator = fateIds.iterator();
-    }
-
-    @Override
-    public FateId getFateId() {
-      if (fateIdIterator.hasNext()) {
-        return fateIdIterator.next();
-      } else {
-        return FateId.from(fateInstanceType, UUID.randomUUID());
-      }
-    }
-
-    public TStatus getStatus(FateId fateId) {
-      return _getStatus(fateId);
-    }
-  }
-
   // Test that configs related to the correctness of the FATE instance user 
table
   // are initialized correctly
   @Test
@@ -151,7 +126,7 @@ public class UserFateStoreIT extends SharedMiniClusterBase {
     String tableName;
     ClientContext client;
     FateId fateId;
-    TestUserFateStore store;
+    UserFateStore<TestEnv> store;
     FateStore.FateTxStore<FateIT.TestEnv> txStore;
 
     @BeforeEach
@@ -159,9 +134,8 @@ public class UserFateStoreIT extends SharedMiniClusterBase {
       client = (ClientContext) 
Accumulo.newClient().from(getClientProps()).build();
       tableName = getUniqueNames(1)[0];
       createFateTable(client, tableName);
-      fateId = FateId.from(fateInstanceType, UUID.randomUUID());
-      store = new TestUserFateStore(client, tableName, List.of(fateId));
-      store.create();
+      store = new UserFateStore<>(client, tableName, 
AbstractFateStore.createDummyLockID(), null);
+      fateId = store.create();
       txStore = store.reserve(fateId);
     }
 
@@ -177,7 +151,10 @@ public class UserFateStoreIT extends SharedMiniClusterBase 
{
         beforeOperation.run();
 
         injectStatus(client, tableName, fateId, status);
-        assertEquals(status, store.getStatus(fateId));
+        var fateIdStatus =
+            store.list().filter(statusEntry -> 
statusEntry.getFateId().equals(fateId)).findFirst()
+                .orElseThrow();
+        assertEquals(status, fateIdStatus.getStatus());
         if (!acceptableStatuses.contains(status)) {
           assertThrows(IllegalStateException.class, operation,
               "Expected operation to fail with status " + status + " but it 
did not");
@@ -210,8 +187,12 @@ public class UserFateStoreIT extends SharedMiniClusterBase 
{
 
     @Test
     public void delete() throws Exception {
-      testOperationWithStatuses(() -> {}, // No special setup needed for delete
-          txStore::delete,
+      testOperationWithStatuses(() -> {
+        // Setup for delete: Create a new txStore before each delete since 
delete cannot be called
+        // on the same txStore more than once
+        fateId = store.create();
+        txStore = store.reserve(fateId);
+      }, () -> txStore.delete(),
           EnumSet.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, 
TStatus.FAILED));
     }
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index c5e6e5eea1..5e5775110f 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -51,9 +51,9 @@ import org.apache.accumulo.core.data.InstanceId;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.fate.AdminUtil;
 import org.apache.accumulo.core.fate.FateInstanceType;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.core.manager.state.tables.TableState;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 9172a2d7b4..28b08dbbf0 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -62,9 +62,9 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.AdminUtil;
 import org.apache.accumulo.core.fate.AdminUtil.FateStatus;
 import org.apache.accumulo.core.fate.FateInstanceType;
-import org.apache.accumulo.core.fate.MetaFateStore;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;


Reply via email to