This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 4b46991aa5 Add a new AccumuloStore for FATE (#4049)
4b46991aa5 is described below

commit 4b46991aa5d16c529af59d50772e6269acfd415b
Author: Christopher L. Shannon <christopher.l.shan...@gmail.com>
AuthorDate: Fri Dec 22 07:50:21 2023 -0500

    Add a new AccumuloStore for FATE (#4049)
    
    This adds support for storing FATE data in an Accumulo table instead of 
storing in
    Zookeeper
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../accumulo/core/fate/AbstractFateStore.java      | 328 +++++++++++++++++++
 .../org/apache/accumulo/core/fate/ZooStore.java    | 290 +----------------
 .../accumulo/core/fate/accumulo/AccumuloStore.java | 287 +++++++++++++++++
 .../accumulo/core/fate/accumulo/FateMutator.java   |  47 +++
 .../core/fate/accumulo/FateMutatorImpl.java        | 143 +++++++++
 .../core/fate/accumulo/schema/FateSchema.java      |  59 ++++
 .../java/org/apache/accumulo/test/fate/FateIT.java | 289 +++++++++++++++++
 .../test/fate/accumulo/AccumuloFateIT.java         |  91 ++++++
 .../fate/accumulo/AccumuloStoreReadWriteIT.java    | 131 ++++++++
 .../accumulo/test/fate/zookeeper/FateIT.java       | 352 ---------------------
 .../test/fate/zookeeper/ZookeeperFateIT.java       | 119 +++++++
 11 files changed, 1507 insertions(+), 629 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
new file mode 100644
index 0000000000..5e840d3247
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public abstract class AbstractFateStore<T> implements FateStore<T> {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AbstractFateStore.class);
+
+  protected final Set<Long> reserved;
+  protected final Map<Long,Long> defered;
+
+  // This is incremented each time a transaction was unreserved that was non 
new
+  protected final SignalCount unreservedNonNewCount = new SignalCount();
+
+  // This is incremented each time a transaction is unreserved that was 
runnable
+  protected final SignalCount unreservedRunnableCount = new SignalCount();
+
+  public AbstractFateStore() {
+    this.reserved = new HashSet<>();
+    this.defered = new HashMap<>();
+  }
+
+  public static byte[] serialize(Object o) {
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      ObjectOutputStream oos = new ObjectOutputStream(baos);
+      oos.writeObject(o);
+      oos.close();
+
+      return baos.toByteArray();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION",
+      justification = "unsafe to store arbitrary serialized objects like this, 
but needed for now"
+          + " for backwards compatibility")
+  public static Object deserialize(byte[] ser) {
+    try {
+      ByteArrayInputStream bais = new ByteArrayInputStream(ser);
+      ObjectInputStream ois = new ObjectInputStream(bais);
+      return ois.readObject();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    } catch (ReflectiveOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Attempt to reserve transaction
+   *
+   * @param tid transaction id
+   * @return true if reserved by this call, false if already reserved
+   */
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(long tid) {
+    synchronized (this) {
+      if (!reserved.contains(tid)) {
+        return Optional.of(reserve(tid));
+      }
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public FateTxStore<T> reserve(long tid) {
+    synchronized (AbstractFateStore.this) {
+      while (reserved.contains(tid)) {
+        try {
+          AbstractFateStore.this.wait(100);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        }
+      }
+
+      reserved.add(tid);
+      return newFateTxStore(tid, true);
+    }
+  }
+
+  @Override
+  public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
+
+    while (keepWaiting.get()) {
+      ArrayList<Long> runnableTids = new ArrayList<>();
+
+      final long beforeCount = unreservedRunnableCount.getCount();
+
+      List<String> transactions = getTransactions();
+      for (String txidStr : transactions) {
+        long txid = parseTid(txidStr);
+        if (isRunnable(_getStatus(txid))) {
+          runnableTids.add(txid);
+        }
+      }
+
+      synchronized (this) {
+        runnableTids.removeIf(txid -> {
+          var deferedTime = defered.get(txid);
+          if (deferedTime != null) {
+            if (deferedTime >= System.currentTimeMillis()) {
+              return true;
+            } else {
+              defered.remove(txid);
+            }
+          }
+
+          if (reserved.contains(txid)) {
+            return true;
+          }
+
+          return false;
+        });
+      }
+
+      if (runnableTids.isEmpty()) {
+        if (beforeCount == unreservedRunnableCount.getCount()) {
+          long waitTime = 5000;
+          if (!defered.isEmpty()) {
+            Long minTime = Collections.min(defered.values());
+            waitTime = minTime - System.currentTimeMillis();
+          }
+
+          if (waitTime > 0) {
+            unreservedRunnableCount.waitFor(count -> count != beforeCount, 
waitTime,
+                keepWaiting::get);
+          }
+        }
+      } else {
+        return runnableTids.iterator();
+      }
+
+    }
+
+    return List.<Long>of().iterator();
+  }
+
+  @Override
+  public List<Long> list() {
+    ArrayList<Long> l = new ArrayList<>();
+    List<String> transactions = getTransactions();
+    for (String txid : transactions) {
+      l.add(parseTid(txid));
+    }
+    return l;
+  }
+
+  @Override
+  public ReadOnlyFateTxStore<T> read(long tid) {
+    return newFateTxStore(tid, false);
+  }
+
+  protected boolean isRunnable(TStatus status) {
+    return status == TStatus.IN_PROGRESS || status == 
TStatus.FAILED_IN_PROGRESS
+        || status == TStatus.SUBMITTED;
+  }
+
+  protected long parseTid(String txdir) {
+    return Long.parseLong(txdir.split("_")[1], 16);
+  }
+
+  protected abstract List<String> getTransactions();
+
+  protected abstract TStatus _getStatus(long tid);
+
+  protected abstract FateTxStore<T> newFateTxStore(long tid, boolean 
isReserved);
+
+  protected abstract class AbstractFateTxStoreImpl<T> implements 
FateTxStore<T> {
+    protected final long tid;
+    protected final boolean isReserved;
+
+    protected TStatus observedStatus = null;
+
+    protected AbstractFateTxStoreImpl(long tid, boolean isReserved) {
+      this.tid = tid;
+      this.isReserved = isReserved;
+    }
+
+    @Override
+    public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
+      Preconditions.checkState(!isReserved,
+          "Attempted to wait for status change while reserved " + 
FateTxId.formatTid(getID()));
+      while (true) {
+
+        long countBefore = unreservedNonNewCount.getCount();
+
+        TStatus status = _getStatus(tid);
+        if (expected.contains(status)) {
+          return status;
+        }
+
+        unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () 
-> true);
+      }
+    }
+
+    @Override
+    public void unreserve(long deferTime) {
+
+      if (deferTime < 0) {
+        throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
+      }
+
+      synchronized (AbstractFateStore.this) {
+        if (!reserved.remove(tid)) {
+          throw new IllegalStateException(
+              "Tried to unreserve id that was not reserved " + 
FateTxId.formatTid(tid));
+        }
+
+        // notify any threads waiting to reserve
+        AbstractFateStore.this.notifyAll();
+
+        if (deferTime > 0) {
+          defered.put(tid, System.currentTimeMillis() + deferTime);
+        }
+      }
+
+      if (observedStatus != null && isRunnable(observedStatus)) {
+        unreservedRunnableCount.increment();
+      }
+
+      if (observedStatus != TStatus.NEW) {
+        unreservedNonNewCount.increment();
+      }
+    }
+
+    protected void verifyReserved(boolean isWrite) {
+      if (!isReserved && isWrite) {
+        throw new IllegalStateException("Attempted write on unreserved FATE 
transaction.");
+      }
+
+      if (isReserved) {
+        synchronized (AbstractFateStore.this) {
+          if (!reserved.contains(tid)) {
+            throw new IllegalStateException(
+                "Tried to operate on unreserved transaction " + 
FateTxId.formatTid(tid));
+          }
+        }
+      }
+    }
+
+    @Override
+    public TStatus getStatus() {
+      verifyReserved(false);
+      var status = _getStatus(tid);
+      observedStatus = status;
+      return status;
+    }
+
+    @Override
+    public long getID() {
+      return tid;
+    }
+
+    protected byte[] serializeTxInfo(Serializable so) {
+      if (so instanceof String) {
+        return ("S " + so).getBytes(UTF_8);
+      } else {
+        byte[] sera = serialize(so);
+        byte[] data = new byte[sera.length + 2];
+        System.arraycopy(sera, 0, data, 2, sera.length);
+        data[0] = 'O';
+        data[1] = ' ';
+        return data;
+      }
+    }
+
+    protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
+      if (data[0] == 'O') {
+        byte[] sera = new byte[data.length - 2];
+        System.arraycopy(data, 2, sera, 0, sera.length);
+        return (Serializable) deserialize(sera);
+      } else if (data[0] == 'S') {
+        return new String(data, 2, data.length - 2, UTF_8);
+      } else {
+        throw new IllegalStateException("Bad node data " + txInfo);
+      }
+    }
+
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
index 7b2e3e8f38..969aed0717 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
@@ -23,24 +23,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -53,70 +39,23 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
 //TODO use zoocache? - ACCUMULO-1297
 //TODO handle zookeeper being down gracefully - ACCUMULO-1297
 
-public class ZooStore<T> implements FateStore<T> {
+public class ZooStore<T> extends AbstractFateStore<T> {
 
   private static final Logger log = LoggerFactory.getLogger(ZooStore.class);
   private String path;
   private ZooReaderWriter zk;
-  private Set<Long> reserved;
-  private Map<Long,Long> defered;
-
-  // This is incremented each time a transaction was unreserved that was non 
new
-  private final SignalCount unreservedNonNewCount = new SignalCount();
-
-  // This is incremented each time a transaction is unreserved that was 
runnable
-  private final SignalCount unreservedRunnableCount = new SignalCount();
-
-  private byte[] serialize(Object o) {
-
-    try {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      ObjectOutputStream oos = new ObjectOutputStream(baos);
-      oos.writeObject(o);
-      oos.close();
-
-      return baos.toByteArray();
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION",
-      justification = "unsafe to store arbitrary serialized objects like this, 
but needed for now"
-          + " for backwards compatibility")
-  private Object deserialize(byte[] ser) {
-    try {
-      ByteArrayInputStream bais = new ByteArrayInputStream(ser);
-      ObjectInputStream ois = new ObjectInputStream(bais);
-      return ois.readObject();
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    } catch (ReflectiveOperationException e) {
-      throw new IllegalStateException(e);
-    }
-  }
 
   private String getTXPath(long tid) {
     return FastFormat.toHexString(path + "/tx_", tid, "");
   }
 
-  private long parseTid(String txdir) {
-    return Long.parseLong(txdir.split("_")[1], 16);
-  }
-
   public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, 
InterruptedException {
-
+    super();
     this.path = path;
     this.zk = zk;
-    this.reserved = new HashSet<>();
-    this.defered = new HashMap<>();
 
     zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP);
   }
@@ -143,94 +82,10 @@ public class ZooStore<T> implements FateStore<T> {
     }
   }
 
-  @Override
-  public FateTxStore<T> reserve(long tid) {
-    synchronized (ZooStore.this) {
-      while (reserved.contains(tid)) {
-        try {
-          ZooStore.this.wait(100);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IllegalStateException(e);
-        }
-      }
-
-      reserved.add(tid);
-      return new FateTxStoreImpl(tid, true);
-    }
-  }
-
-  /**
-   * Attempt to reserve transaction
-   *
-   * @param tid transaction id
-   * @return true if reserved by this call, false if already reserved
-   */
-  @Override
-  public Optional<FateTxStore<T>> tryReserve(long tid) {
-    synchronized (this) {
-      if (!reserved.contains(tid)) {
-        return Optional.of(reserve(tid));
-      }
-      return Optional.empty();
-    }
-  }
-
-  private class FateTxStoreImpl implements FateTxStore<T> {
-
-    private final long tid;
-    private final boolean isReserved;
-
-    private TStatus observedStatus = null;
+  private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
 
     private FateTxStoreImpl(long tid, boolean isReserved) {
-      this.tid = tid;
-      this.isReserved = isReserved;
-    }
-
-    @Override
-    public void unreserve(long deferTime) {
-
-      if (deferTime < 0) {
-        throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
-      }
-
-      synchronized (ZooStore.this) {
-        if (!reserved.remove(tid)) {
-          throw new IllegalStateException(
-              "Tried to unreserve id that was not reserved " + 
FateTxId.formatTid(tid));
-        }
-
-        // notify any threads waiting to reserve
-        ZooStore.this.notifyAll();
-
-        if (deferTime > 0) {
-          defered.put(tid, System.currentTimeMillis() + deferTime);
-        }
-      }
-
-      if (observedStatus != null && isRunnable(observedStatus)) {
-        unreservedRunnableCount.increment();
-      }
-
-      if (observedStatus != TStatus.NEW) {
-        unreservedNonNewCount.increment();
-      }
-    }
-
-    private void verifyReserved(boolean isWrite) {
-      if (!isReserved && isWrite) {
-        throw new IllegalStateException("Attempted write on unreserved FATE 
transaction.");
-      }
-
-      if (isReserved) {
-        synchronized (ZooStore.this) {
-          if (!reserved.contains(tid)) {
-            throw new IllegalStateException(
-                "Tried to operate on unreserved transaction " + 
FateTxId.formatTid(tid));
-          }
-        }
-      }
+      super(tid, isReserved);
     }
 
     private static final int RETRIES = 10;
@@ -322,31 +177,6 @@ public class ZooStore<T> implements FateStore<T> {
       }
     }
 
-    @Override
-    public TStatus getStatus() {
-      verifyReserved(false);
-      var status = _getStatus(tid);
-      observedStatus = status;
-      return _getStatus(tid);
-    }
-
-    @Override
-    public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
-      Preconditions.checkState(!isReserved,
-          "Attempted to wait for status change while reserved " + 
FateTxId.formatTid(getID()));
-      while (true) {
-
-        long countBefore = unreservedNonNewCount.getCount();
-
-        TStatus status = _getStatus(tid);
-        if (expected.contains(status)) {
-          return status;
-        }
-
-        unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () 
-> true);
-      }
-    }
-
     @Override
     public void setStatus(TStatus status) {
       verifyReserved(true);
@@ -377,17 +207,8 @@ public class ZooStore<T> implements FateStore<T> {
       verifyReserved(true);
 
       try {
-        if (so instanceof String) {
-          zk.putPersistentData(getTXPath(tid) + "/" + txInfo, ("S " + 
so).getBytes(UTF_8),
-              NodeExistsPolicy.OVERWRITE);
-        } else {
-          byte[] sera = serialize(so);
-          byte[] data = new byte[sera.length + 2];
-          System.arraycopy(sera, 0, data, 2, sera.length);
-          data[0] = 'O';
-          data[1] = ' ';
-          zk.putPersistentData(getTXPath(tid) + "/" + txInfo, data, 
NodeExistsPolicy.OVERWRITE);
-        }
+        zk.putPersistentData(getTXPath(tid) + "/" + txInfo, 
serializeTxInfo(so),
+            NodeExistsPolicy.OVERWRITE);
       } catch (KeeperException | InterruptedException e2) {
         throw new IllegalStateException(e2);
       }
@@ -398,17 +219,7 @@ public class ZooStore<T> implements FateStore<T> {
       verifyReserved(false);
 
       try {
-        byte[] data = zk.getData(getTXPath(tid) + "/" + txInfo);
-
-        if (data[0] == 'O') {
-          byte[] sera = new byte[data.length - 2];
-          System.arraycopy(data, 2, sera, 0, sera.length);
-          return (Serializable) deserialize(sera);
-        } else if (data[0] == 'S') {
-          return new String(data, 2, data.length - 2, UTF_8);
-        } else {
-          throw new IllegalStateException("Bad node data " + txInfo);
-        }
+        return deserializeTxInfo(txInfo, zk.getData(getTXPath(tid) + "/" + 
txInfo));
       } catch (NoNodeException nne) {
         return null;
       } catch (KeeperException | InterruptedException e) {
@@ -428,11 +239,6 @@ public class ZooStore<T> implements FateStore<T> {
       }
     }
 
-    @Override
-    public long getID() {
-      return tid;
-    }
-
     @Override
     public List<ReadOnlyRepo<T>> getStack() {
       verifyReserved(false);
@@ -475,7 +281,8 @@ public class ZooStore<T> implements FateStore<T> {
     }
   }
 
-  private TStatus _getStatus(long tid) {
+  @Override
+  protected TStatus _getStatus(long tid) {
     try {
       return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8));
     } catch (NoNodeException nne) {
@@ -486,88 +293,17 @@ public class ZooStore<T> implements FateStore<T> {
   }
 
   @Override
-  public ReadOnlyFateTxStore<T> read(long tid) {
-    return new FateTxStoreImpl(tid, false);
+  protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) {
+    return new FateTxStoreImpl(tid, isReserved);
   }
 
   @Override
-  public List<Long> list() {
+  protected List<String> getTransactions() {
     try {
-      ArrayList<Long> l = new ArrayList<>();
-      List<String> transactions = zk.getChildren(path);
-      for (String txid : transactions) {
-        l.add(parseTid(txid));
-      }
-      return l;
+      return zk.getChildren(path);
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException(e);
     }
   }
 
-  private boolean isRunnable(TStatus status) {
-    return status == TStatus.IN_PROGRESS || status == 
TStatus.FAILED_IN_PROGRESS
-        || status == TStatus.SUBMITTED;
-  }
-
-  @Override
-  public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
-
-    while (keepWaiting.get()) {
-      ArrayList<Long> runnableTids = new ArrayList<>();
-
-      final long beforeCount = unreservedRunnableCount.getCount();
-
-      try {
-
-        List<String> transactions = zk.getChildren(path);
-        for (String txidStr : transactions) {
-          long txid = parseTid(txidStr);
-          if (isRunnable(_getStatus(txid))) {
-            runnableTids.add(txid);
-          }
-        }
-
-        synchronized (this) {
-          runnableTids.removeIf(txid -> {
-            var deferedTime = defered.get(txid);
-            if (deferedTime != null) {
-              if (deferedTime >= System.currentTimeMillis()) {
-                return true;
-              } else {
-                defered.remove(txid);
-              }
-            }
-
-            if (reserved.contains(txid)) {
-              return true;
-            }
-
-            return false;
-          });
-        }
-
-        if (runnableTids.isEmpty()) {
-          if (beforeCount == unreservedRunnableCount.getCount()) {
-            long waitTime = 5000;
-            if (!defered.isEmpty()) {
-              Long minTime = Collections.min(defered.values());
-              waitTime = minTime - System.currentTimeMillis();
-            }
-
-            if (waitTime > 0) {
-              unreservedRunnableCount.waitFor(count -> count != beforeCount, 
waitTime,
-                  keepWaiting::get);
-            }
-          }
-        } else {
-          return runnableTids.iterator();
-        }
-
-      } catch (KeeperException | InterruptedException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-
-    return List.<Long>of().iterator();
-  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
new file mode 100644
index 0000000000..aa5883a6d8
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.accumulo;
+
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.StackOverflowException;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily;
+import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+public class AccumuloStore<T> extends AbstractFateStore<T> {
+
+  private final ClientContext context;
+  private final String tableName;
+
+  private static final int maxRepos = 100;
+  private static final com.google.common.collect.Range<Integer> REPO_RANGE =
+      com.google.common.collect.Range.closed(1, maxRepos);
+
+  public AccumuloStore(ClientContext context, String tableName) {
+    this.context = Objects.requireNonNull(context);
+    this.tableName = Objects.requireNonNull(tableName);
+  }
+
+  @Override
+  public long create() {
+    long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+
+    
newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate();
+
+    return tid;
+  }
+
+  @Override
+  protected List<String> getTransactions() {
+    return scanTx(scanner -> {
+      scanner.setRange(new Range());
+      TxColumnFamily.STATUS_COLUMN.fetch(scanner);
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> 
e.getKey().getRow().toString()).collect(Collectors.toList());
+    });
+  }
+
+  @Override
+  protected TStatus _getStatus(long tid) {
+    return scanTx(scanner -> {
+      scanner.setRange(getRow(tid));
+      TxColumnFamily.STATUS_COLUMN.fetch(scanner);
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> 
TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN);
+    });
+  }
+
+  @Override
+  protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) {
+    return new FateTxStoreImpl(tid, isReserved);
+  }
+
+  static Range getRow(long tid) {
+    return new Range("tx_" + FastFormat.toHexString(tid));
+  }
+
+  private FateMutatorImpl<T> newMutator(long tid) {
+    return new FateMutatorImpl<>(context, tableName, tid);
+  }
+
+  private <R> R scanTx(Function<Scanner,R> func) {
+    try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+      return func.apply(scanner);
+    } catch (TableNotFoundException e) {
+      throw new IllegalStateException(tableName + " not found!", e);
+    }
+  }
+
+  private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
+
+    private FateTxStoreImpl(long tid, boolean isReserved) {
+      super(tid, isReserved);
+    }
+
+    @Override
+    public Repo<T> top() {
+      verifyReserved(false);
+
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        scanner.setBatchSize(1);
+        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
+        return StreamSupport.stream(scanner.spliterator(), false).map(e -> {
+          @SuppressWarnings("unchecked")
+          var repo = (Repo<T>) deserialize(e.getValue().get());
+          return repo;
+        }).findFirst().orElse(null);
+      });
+    }
+
+    @Override
+    public List<ReadOnlyRepo<T>> getStack() {
+      verifyReserved(false);
+
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
+        return StreamSupport.stream(scanner.spliterator(), false).map(e -> {
+          @SuppressWarnings("unchecked")
+          var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get());
+          return repo;
+        }).collect(Collectors.toList());
+      });
+    }
+
+    @Override
+    public Serializable getTransactionInfo(TxInfo txInfo) {
+      verifyReserved(false);
+
+      try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+        scanner.setRange(getRow(tid));
+
+        final ColumnFQ cq;
+        switch (txInfo) {
+          case TX_NAME:
+            cq = TxInfoColumnFamily.TX_NAME_COLUMN;
+            break;
+          case AUTO_CLEAN:
+            cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN;
+            break;
+          case EXCEPTION:
+            cq = TxInfoColumnFamily.EXCEPTION_COLUMN;
+            break;
+          case RETURN_VALUE:
+            cq = TxInfoColumnFamily.RETURN_VALUE_COLUMN;
+            break;
+          default:
+            throw new IllegalArgumentException("Unexpected TxInfo type " + 
txInfo);
+        }
+        scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier());
+
+        return StreamSupport.stream(scanner.spliterator(), false)
+            .map(e -> deserializeTxInfo(txInfo, 
e.getValue().get())).findFirst().orElse(null);
+      } catch (TableNotFoundException e) {
+        throw new IllegalStateException(tableName + " not found!", e);
+      }
+    }
+
+    @Override
+    public long timeCreated() {
+      verifyReserved(false);
+
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner);
+        return StreamSupport.stream(scanner.spliterator(), false)
+            .map(e -> 
Long.parseLong(e.getValue().toString())).findFirst().orElse(0L);
+      });
+    }
+
+    @Override
+    public void push(Repo<T> repo) throws StackOverflowException {
+      verifyReserved(true);
+
+      try {
+        Optional<Integer> top = findTop();
+
+        if (top.filter(t -> t >= maxRepos).isPresent()) {
+          throw new StackOverflowException("Repo stack size too large");
+        }
+
+        FateMutator<T> fateMutator = newMutator(tid);
+        fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
+      } catch (StackOverflowException soe) {
+        throw soe;
+      }
+    }
+
+    @Override
+    public void pop() {
+      verifyReserved(true);
+
+      Optional<Integer> top = findTop();
+      top.ifPresent(t -> newMutator(tid).deleteRepo(t).mutate());
+    }
+
+    @Override
+    public void setStatus(TStatus status) {
+      verifyReserved(true);
+
+      newMutator(tid).putStatus(status).mutate();
+      observedStatus = status;
+    }
+
+    @Override
+    public void setTransactionInfo(TxInfo txInfo, Serializable so) {
+      verifyReserved(true);
+
+      FateMutator<T> fateMutator = newMutator(tid);
+      final byte[] serialized = serializeTxInfo(so);
+
+      switch (txInfo) {
+        case TX_NAME:
+          fateMutator.putName(serialized);
+          break;
+        case AUTO_CLEAN:
+          fateMutator.putAutoClean(serialized);
+          break;
+        case EXCEPTION:
+          fateMutator.putException(serialized);
+          break;
+        case RETURN_VALUE:
+          fateMutator.putReturnValue(serialized);
+          break;
+        default:
+          throw new IllegalArgumentException("Unexpected TxInfo type " + 
txInfo);
+      }
+
+      fateMutator.mutate();
+    }
+
+    @Override
+    public void delete() {
+      verifyReserved(true);
+
+      newMutator(tid).delete().mutate();
+    }
+
+    private Optional<Integer> findTop() {
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        scanner.setBatchSize(1);
+        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
+        return StreamSupport.stream(scanner.spliterator(), false)
+            .map(e -> 
restoreRepo(e.getKey().getColumnQualifier())).findFirst();
+      });
+    }
+  }
+
+  static Text invertRepo(int position) {
+    Preconditions.checkArgument(REPO_RANGE.contains(position),
+        "Position %s is not in the valid range of [0,%s]", position, maxRepos);
+    return new Text(String.format("%02d", maxRepos - position));
+  }
+
+  static Integer restoreRepo(Text invertedPosition) {
+    int position = maxRepos - Integer.parseInt(invertedPosition.toString());
+    Preconditions.checkArgument(REPO_RANGE.contains(position),
+        "Position %s is not in the valid range of [0,%s]", position, maxRepos);
+    return position;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java
new file mode 100644
index 0000000000..306841612e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.accumulo;
+
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.Repo;
+
+public interface FateMutator<T> {
+
+  FateMutator<T> putStatus(TStatus status);
+
+  FateMutator<T> putCreateTime(long ctime);
+
+  FateMutator<T> putName(byte[] data);
+
+  FateMutator<T> putAutoClean(byte[] data);
+
+  FateMutator<T> putException(byte[] data);
+
+  FateMutator<T> putReturnValue(byte[] data);
+
+  FateMutator<T> putTxInfo(Fate.TxInfo txInfo, byte[] data);
+
+  FateMutator<T> putRepo(int position, Repo<T> repo);
+
+  FateMutator<T> deleteRepo(int position);
+
+  void mutate();
+
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java
new file mode 100644
index 0000000000..b605b91097
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.accumulo;
+
+import static org.apache.accumulo.core.fate.AbstractFateStore.serialize;
+import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.getRow;
+import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.invertRepo;
+
+import java.util.Objects;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.Repo;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily;
+import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.hadoop.io.Text;
+
+public class FateMutatorImpl<T> implements FateMutator<T> {
+
+  private final ClientContext context;
+  private final String tableName;
+  private final long tid;
+  private final Mutation mutation;
+
+  FateMutatorImpl(ClientContext context, String tableName, long tid) {
+    this.context = Objects.requireNonNull(context);
+    this.tableName = Objects.requireNonNull(tableName);
+    this.tid = tid;
+    this.mutation = new Mutation(new Text("tx_" + 
FastFormat.toHexString(tid)));
+  }
+
+  @Override
+  public FateMutator<T> putStatus(TStatus status) {
+    TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> putCreateTime(long ctime) {
+    TxColumnFamily.CREATE_TIME_COLUMN.put(mutation, new 
Value(Long.toString(ctime)));
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> putName(byte[] data) {
+    TxInfoColumnFamily.TX_NAME_COLUMN.put(mutation, new Value(data));
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> putAutoClean(byte[] data) {
+    TxInfoColumnFamily.AUTO_CLEAN_COLUMN.put(mutation, new Value(data));
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> putException(byte[] data) {
+    TxInfoColumnFamily.EXCEPTION_COLUMN.put(mutation, new Value(data));
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> putReturnValue(byte[] data) {
+    TxInfoColumnFamily.RETURN_VALUE_COLUMN.put(mutation, new Value(data));
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> putTxInfo(TxInfo txInfo, byte[] data) {
+    switch (txInfo) {
+      case TX_NAME:
+        putName(data);
+        break;
+      case AUTO_CLEAN:
+        putAutoClean(data);
+        break;
+      case EXCEPTION:
+        putException(data);
+        break;
+      case RETURN_VALUE:
+        putReturnValue(data);
+        break;
+    }
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> putRepo(int position, Repo<T> repo) {
+    mutation.put(RepoColumnFamily.NAME, invertRepo(position), new 
Value(serialize(repo)));
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> deleteRepo(int position) {
+    mutation.putDelete(RepoColumnFamily.NAME, invertRepo(position));
+    return this;
+  }
+
+  public FateMutator<T> delete() {
+    try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+      scanner.setRange(getRow(tid));
+      scanner.forEach(
+          (key, value) -> mutation.putDelete(key.getColumnFamily(), 
key.getColumnQualifier()));
+    } catch (TableNotFoundException e) {
+      throw new IllegalStateException(tableName + " not found!", e);
+    }
+    return this;
+  }
+
+  @Override
+  public void mutate() {
+    try (BatchWriter writer = context.createBatchWriter(tableName)) {
+      writer.addMutation(mutation);
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java
 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java
new file mode 100644
index 0000000000..dbb84049a8
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.accumulo.schema;
+
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.hadoop.io.Text;
+
+public class FateSchema {
+
+  public static class TxColumnFamily {
+    public static final String STR_NAME = "tx";
+    public static final Text NAME = new Text(STR_NAME);
+
+    public static final String STATUS = "status";
+    public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new 
Text(STATUS));
+
+    public static final String CREATE_TIME = "ctime";
+    public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new 
Text(CREATE_TIME));
+  }
+
+  public static class TxInfoColumnFamily {
+    public static final String STR_NAME = "txinfo";
+    public static final Text NAME = new Text(STR_NAME);
+
+    public static final String TX_NAME = "txname";
+    public static final ColumnFQ TX_NAME_COLUMN = new ColumnFQ(NAME, new 
Text(TX_NAME));
+
+    public static final String AUTO_CLEAN = "autoclean";
+    public static final ColumnFQ AUTO_CLEAN_COLUMN = new ColumnFQ(NAME, new 
Text(AUTO_CLEAN));
+
+    public static final String EXCEPTION = "exception";
+    public static final ColumnFQ EXCEPTION_COLUMN = new ColumnFQ(NAME, new 
Text(EXCEPTION));
+
+    public static final String RETURN_VALUE = "retval";
+    public static final ColumnFQ RETURN_VALUE_COLUMN = new ColumnFQ(NAME, new 
Text(RETURN_VALUE));
+  }
+
+  public static class RepoColumnFamily {
+    public static final String STR_NAME = "repos";
+    public static final Text NAME = new Text(STR_NAME);
+  }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
new file mode 100644
index 0000000000..217f68e5c7
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class FateIT extends SharedMiniClusterBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FateIT.class);
+
+  private static CountDownLatch callStarted;
+  private static CountDownLatch finishCall;
+
+  public static class TestEnv {
+
+  }
+
+  public static class TestRepo implements Repo<TestEnv> {
+    private static final long serialVersionUID = 1L;
+
+    private final String data;
+
+    public TestRepo(String data) {
+      this.data = data;
+    }
+
+    @Override
+    public long isReady(long tid, TestEnv environment) throws Exception {
+      return 0;
+    }
+
+    @Override
+    public String getName() {
+      return "TestRepo_" + data;
+    }
+
+    @Override
+    public Repo<TestEnv> call(long tid, TestEnv environment) throws Exception {
+      LOG.debug("Entering call {}", FateTxId.formatTid(tid));
+      try {
+        FateIT.inCall();
+        return null;
+      } finally {
+        LOG.debug("Leaving call {}", FateTxId.formatTid(tid));
+      }
+    }
+
+    @Override
+    public void undo(long tid, TestEnv environment) throws Exception {
+
+    }
+
+    @Override
+    public String getReturn() {
+      return data + "_ret";
+    }
+  }
+
+  @Test
+  @Timeout(30)
+  public void testTransactionStatus() throws Exception {
+    executeTest(this::testTransactionStatus);
+  }
+
+  protected void testTransactionStatus(FateStore<TestEnv> store, ServerContext 
sctx)
+      throws Exception {
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+    TestEnv testEnv = new TestEnv();
+    Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config);
+    try {
+
+      // Wait for the transaction runner to be scheduled.
+      Thread.sleep(3000);
+
+      callStarted = new CountDownLatch(1);
+      finishCall = new CountDownLatch(1);
+
+      long txid = fate.startTransaction();
+      assertEquals(TStatus.NEW, getTxStatus(sctx, txid));
+      fate.seedTransaction("TestOperation", txid, new 
TestRepo("testTransactionStatus"), true,
+          "Test Op");
+      assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, txid));
+      // wait for call() to be called
+      callStarted.await();
+      assertEquals(IN_PROGRESS, getTxStatus(sctx, txid));
+      // tell the op to exit the method
+      finishCall.countDown();
+
+      // TODO: This check seems like a race condition that might
+      // need to be fixed as occasionally the test fails because it was
+      // already removed so that seems to indicate things are removed
+      // before can check it was SUCCESSFUL
+      TStatus s = getTxStatus(sctx, txid);
+      while (s != SUCCESSFUL) {
+        s = getTxStatus(sctx, txid);
+        Thread.sleep(10);
+      }
+      // Check that it gets removed
+      boolean removed = false;
+      while (!removed) {
+        removed = verifyRemoved(sctx, txid);
+        Thread.sleep(10);
+      }
+
+    } finally {
+      fate.shutdown();
+    }
+  }
+
+  @Test
+  public void testCancelWhileNew() throws Exception {
+    executeTest(this::testCancelWhileNew);
+  }
+
+  protected void testCancelWhileNew(FateStore<TestEnv> store, ServerContext 
sctx) throws Exception {
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+    TestEnv testEnv = new TestEnv();
+    Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config);
+    try {
+
+      // Wait for the transaction runner to be scheduled.
+      Thread.sleep(3000);
+
+      callStarted = new CountDownLatch(1);
+      finishCall = new CountDownLatch(1);
+
+      long txid = fate.startTransaction();
+      LOG.debug("Starting test testCancelWhileNew with {}", 
FateTxId.formatTid(txid));
+      assertEquals(NEW, getTxStatus(sctx, txid));
+      // cancel the transaction
+      assertTrue(fate.cancel(txid));
+      assertTrue(
+          FAILED_IN_PROGRESS == getTxStatus(sctx, txid) || FAILED == 
getTxStatus(sctx, txid));
+      fate.seedTransaction("TestOperation", txid, new 
TestRepo("testCancelWhileNew"), true,
+          "Test Op");
+      Wait.waitFor(() -> FAILED == getTxStatus(sctx, txid));
+      // nothing should have run
+      assertEquals(1, callStarted.getCount());
+      fate.delete(txid);
+      assertThrows(getNoTxExistsException(), () -> getTxStatus(sctx, txid));
+    } finally {
+      fate.shutdown();
+    }
+  }
+
+  @Test
+  public void testCancelWhileSubmittedAndRunning() throws Exception {
+    executeTest(this::testCancelWhileSubmittedAndRunning);
+  }
+
+  protected void testCancelWhileSubmittedAndRunning(FateStore<TestEnv> store, 
ServerContext sctx)
+      throws Exception {
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+    TestEnv testEnv = new TestEnv();
+    Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config);
+    try {
+
+      // Wait for the transaction runner to be scheduled.
+      Thread.sleep(3000);
+
+      callStarted = new CountDownLatch(1);
+      finishCall = new CountDownLatch(1);
+
+      long txid = fate.startTransaction();
+      LOG.debug("Starting test testCancelWhileSubmitted with {}", 
FateTxId.formatTid(txid));
+      assertEquals(NEW, getTxStatus(sctx, txid));
+      fate.seedTransaction("TestOperation", txid,
+          new TestRepo("testCancelWhileSubmittedAndRunning"), false, "Test 
Op");
+      Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, txid));
+      // This is false because the transaction runner has reserved the FaTe
+      // transaction.
+      assertFalse(fate.cancel(txid));
+      callStarted.await();
+      finishCall.countDown();
+      Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, txid));
+      fate.delete(txid);
+      assertThrows(getNoTxExistsException(), () -> getTxStatus(sctx, txid));
+    } finally {
+      fate.shutdown();
+    }
+  }
+
+  @Test
+  public void testCancelWhileInCall() throws Exception {
+    executeTest(this::testCancelWhileInCall);
+  }
+
+  protected void testCancelWhileInCall(FateStore<TestEnv> store, ServerContext 
sctx)
+      throws Exception {
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+    TestEnv testEnv = new TestEnv();
+    Fate<TestEnv> fate = new Fate<>(testEnv, store, r -> r + "", config);
+    try {
+
+      // Wait for the transaction runner to be scheduled.
+      Thread.sleep(3000);
+
+      callStarted = new CountDownLatch(1);
+      finishCall = new CountDownLatch(1);
+
+      long txid = fate.startTransaction();
+      LOG.debug("Starting test testCancelWhileInCall with {}", 
FateTxId.formatTid(txid));
+      assertEquals(NEW, getTxStatus(sctx, txid));
+      fate.seedTransaction("TestOperation", txid, new 
TestRepo("testCancelWhileInCall"), true,
+          "Test Op");
+      assertEquals(SUBMITTED, getTxStatus(sctx, txid));
+      // wait for call() to be called
+      callStarted.await();
+      // cancel the transaction
+      assertFalse(fate.cancel(txid));
+    } finally {
+      fate.shutdown();
+    }
+
+  }
+
+  protected abstract TStatus getTxStatus(ServerContext sctx, long txid) throws 
Exception;
+
+  protected abstract boolean verifyRemoved(ServerContext sctx, long txid);
+
+  protected abstract void executeTest(FateTestExecutor testMethod) throws 
Exception;
+
+  protected interface FateTestExecutor {
+    void execute(FateStore<TestEnv> store, ServerContext sctx) throws 
Exception;
+  }
+
+  private static void inCall() throws InterruptedException {
+    // signal that call started
+    callStarted.countDown();
+    // wait for the signal to exit the method
+    finishCall.await();
+  }
+
+  protected Class<? extends Exception> getNoTxExistsException() {
+    return NoSuchElementException.class;
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java
new file mode 100644
index 0000000000..fe33cb92c1
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import java.util.NoSuchElementException;
+import java.util.stream.StreamSupport;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
+import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.fate.FateIT;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+public class AccumuloFateIT extends FateIT {
+
+  private String table;
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Override
+  protected void executeTest(FateTestExecutor testMethod) throws Exception {
+    table = getUniqueNames(1)[0];
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table);
+
+      final AccumuloStore<TestEnv> accumuloStore = new AccumuloStore<>(client, 
table);
+      testMethod.execute(accumuloStore, getCluster().getServerContext());
+    }
+  }
+
+  @Override
+  protected TStatus getTxStatus(ServerContext context, long txid) {
+    try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY)) 
{
+      scanner.setRange(getRow(txid));
+      TxColumnFamily.STATUS_COLUMN.fetch(scanner);
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> 
TStatus.valueOf(e.getValue().toString())).findFirst().orElseThrow();
+    } catch (TableNotFoundException e) {
+      throw new IllegalStateException(table + " not found!", e);
+    }
+  }
+
+  @Override
+  protected boolean verifyRemoved(ServerContext sctx, long txid) {
+    try {
+      getTxStatus(sctx, txid);
+    } catch (NoSuchElementException e) {
+      return true;
+    }
+    return false;
+  }
+
+  private static Range getRow(long tid) {
+    return new Range("tx_" + FastFormat.toHexString(tid));
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java
new file mode 100644
index 0000000000..1629458588
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.List;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.fate.FateIT.TestEnv;
+import org.apache.accumulo.test.fate.FateIT.TestRepo;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class AccumuloStoreReadWriteIT extends SharedMiniClusterBase {
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+  }
+
+  @AfterAll
+  public static void teardown() {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(5);
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+    final String table = getUniqueNames(1)[0];
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table);
+
+      AccumuloStore<TestEnv> store = new AccumuloStore<>(client, table);
+      // Verify no transactions
+      assertEquals(0, store.list().size());
+
+      // Create a new transaction and get the store for it
+      long tid = store.create();
+      FateTxStore<TestEnv> txStore = store.reserve(tid);
+      assertTrue(txStore.timeCreated() > 0);
+      assertEquals(1, store.list().size());
+
+      // Push a test FATE op and verify we can read it back
+      txStore.push(new TestRepo("testOp"));
+      TestRepo op = (TestRepo) txStore.top();
+      assertNotNull(op);
+
+      // Test status
+      txStore.setStatus(TStatus.SUBMITTED);
+      assertEquals(TStatus.SUBMITTED, txStore.getStatus());
+
+      // Set a name to test setTransactionInfo()
+      txStore.setTransactionInfo(TxInfo.TX_NAME, "name");
+      assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME));
+
+      // Try setting a second test op to test getStack()
+      // when listing or popping TestOperation2 should be first
+      assertEquals(1, txStore.getStack().size());
+      txStore.push(new TestOperation2());
+      // test top returns TestOperation2
+      ReadOnlyRepo<TestEnv> top = txStore.top();
+      assertInstanceOf(TestOperation2.class, top);
+
+      // test get stack
+      List<ReadOnlyRepo<TestEnv>> ops = txStore.getStack();
+      assertEquals(2, ops.size());
+      assertInstanceOf(TestOperation2.class, ops.get(0));
+      assertEquals(TestRepo.class, ops.get(1).getClass());
+
+      // test pop, TestOperation should be left
+      txStore.pop();
+      ops = txStore.getStack();
+      assertEquals(1, ops.size());
+      assertEquals(TestRepo.class, ops.get(0).getClass());
+
+      // create second
+      FateTxStore<TestEnv> txStore2 = store.reserve(store.create());
+      assertEquals(2, store.list().size());
+
+      // test delete
+      txStore.delete();
+      assertEquals(1, store.list().size());
+      txStore2.delete();
+      assertEquals(0, store.list().size());
+    }
+  }
+
+  private static class TestOperation2 extends TestRepo {
+
+    private static final long serialVersionUID = 1L;
+
+    public TestOperation2() {
+      super("testOperation2");
+    }
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
deleted file mode 100644
index 8ba43996a7..0000000000
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.test.fate.zookeeper;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
-import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
-import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
-import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
-import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
-import static 
org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
-import java.io.File;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.NamespaceId;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.fate.AgeOffStore;
-import org.apache.accumulo.core.fate.Fate;
-import org.apache.accumulo.core.fate.FateTxId;
-import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
-import org.apache.accumulo.core.fate.Repo;
-import org.apache.accumulo.core.fate.ZooStore;
-import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.manager.Manager;
-import org.apache.accumulo.manager.tableOps.ManagerRepo;
-import org.apache.accumulo.manager.tableOps.TraceRepo;
-import org.apache.accumulo.manager.tableOps.Utils;
-import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.test.util.Wait;
-import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
-import org.apache.zookeeper.KeeperException;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Tag(ZOOKEEPER_TESTING_SERVER)
-public class FateIT {
-
-  public static class TestOperation extends ManagerRepo {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TestOperation.class);
-
-    private static final long serialVersionUID = 1L;
-
-    private final TableId tableId;
-    private final NamespaceId namespaceId;
-
-    public TestOperation(NamespaceId namespaceId, TableId tableId) {
-      this.namespaceId = namespaceId;
-      this.tableId = tableId;
-    }
-
-    @Override
-    public long isReady(long tid, Manager manager) throws Exception {
-      return Utils.reserveNamespace(manager, namespaceId, tid, false, true, 
TableOperation.RENAME)
-          + Utils.reserveTable(manager, tableId, tid, true, true, 
TableOperation.RENAME);
-    }
-
-    @Override
-    public void undo(long tid, Manager manager) throws Exception {
-      Utils.unreserveNamespace(manager, namespaceId, tid, false);
-      Utils.unreserveTable(manager, tableId, tid, true);
-    }
-
-    @Override
-    public Repo<Manager> call(long tid, Manager manager) throws Exception {
-      LOG.debug("Entering call {}", FateTxId.formatTid(tid));
-      try {
-        FateIT.inCall();
-        return null;
-      } finally {
-        Utils.unreserveNamespace(manager, namespaceId, tid, false);
-        Utils.unreserveTable(manager, tableId, tid, true);
-        LOG.debug("Leaving call {}", FateTxId.formatTid(tid));
-      }
-
-    }
-
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(FateIT.class);
-
-  @TempDir
-  private static File tempDir;
-
-  private static ZooKeeperTestingServer szk = null;
-  private static ZooReaderWriter zk = null;
-  private static final String ZK_ROOT = "/accumulo/" + 
UUID.randomUUID().toString();
-  private static final NamespaceId NS = NamespaceId.of("testNameSpace");
-  private static final TableId TID = TableId.of("testTable");
-
-  private static CountDownLatch callStarted;
-  private static CountDownLatch finishCall;
-
-  @BeforeAll
-  public static void setup() throws Exception {
-    szk = new ZooKeeperTestingServer(tempDir);
-    zk = szk.getZooReaderWriter();
-    zk.mkdirs(ZK_ROOT + Constants.ZFATE);
-    zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS);
-    zk.mkdirs(ZK_ROOT + Constants.ZNAMESPACES + "/" + NS.canonical());
-    zk.mkdirs(ZK_ROOT + Constants.ZTABLE_STATE + "/" + TID.canonical());
-    zk.mkdirs(ZK_ROOT + Constants.ZTABLES + "/" + TID.canonical());
-  }
-
-  @AfterAll
-  public static void teardown() throws Exception {
-    szk.close();
-  }
-
-  @Test
-  @Timeout(30)
-  public void testTransactionStatus() throws Exception {
-
-    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
-    final AgeOffStore<Manager> store =
-        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
-
-    Manager manager = createMock(Manager.class);
-    ServerContext sctx = createMock(ServerContext.class);
-    expect(manager.getContext()).andReturn(sctx).anyTimes();
-    expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
-    expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
-    replay(manager, sctx);
-
-    ConfigurationCopy config = new ConfigurationCopy();
-    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
-    try {
-
-      // Wait for the transaction runner to be scheduled.
-      Thread.sleep(3000);
-
-      callStarted = new CountDownLatch(1);
-      finishCall = new CountDownLatch(1);
-
-      long txid = fate.startTransaction();
-      assertEquals(TStatus.NEW, getTxStatus(zk, txid));
-      fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
-      assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
-      // wait for call() to be called
-      callStarted.await();
-      assertEquals(IN_PROGRESS, getTxStatus(zk, txid));
-      // tell the op to exit the method
-      finishCall.countDown();
-      // Check that it transitions to SUCCESSFUL
-      TStatus s = getTxStatus(zk, txid);
-      while (s != SUCCESSFUL) {
-        s = getTxStatus(zk, txid);
-        Thread.sleep(10);
-      }
-      // Check that it gets removed
-      boolean errorSeen = false;
-      while (!errorSeen) {
-        try {
-          s = getTxStatus(zk, txid);
-          Thread.sleep(10);
-        } catch (KeeperException e) {
-          if (e.code() == KeeperException.Code.NONODE) {
-            errorSeen = true;
-          } else {
-            fail("Unexpected error thrown: " + e.getMessage());
-          }
-        }
-      }
-
-    } finally {
-      fate.shutdown();
-    }
-  }
-
-  @Test
-  public void testCancelWhileNew() throws Exception {
-    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
-    final AgeOffStore<Manager> store =
-        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
-
-    Manager manager = createMock(Manager.class);
-    ServerContext sctx = createMock(ServerContext.class);
-    expect(manager.getContext()).andReturn(sctx).anyTimes();
-    expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
-    expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
-    replay(manager, sctx);
-
-    ConfigurationCopy config = new ConfigurationCopy();
-    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
-    try {
-
-      // Wait for the transaction runner to be scheduled.
-      Thread.sleep(3000);
-
-      callStarted = new CountDownLatch(1);
-      finishCall = new CountDownLatch(1);
-
-      long txid = fate.startTransaction();
-      LOG.debug("Starting test testCancelWhileNew with {}", 
FateTxId.formatTid(txid));
-      assertEquals(NEW, getTxStatus(zk, txid));
-      // cancel the transaction
-      assertTrue(fate.cancel(txid));
-      assertTrue(FAILED_IN_PROGRESS == getTxStatus(zk, txid) || FAILED == 
getTxStatus(zk, txid));
-      fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
-      Wait.waitFor(() -> FAILED == getTxStatus(zk, txid));
-      // nothing should have run
-      assertEquals(1, callStarted.getCount());
-      fate.delete(txid);
-      assertThrows(KeeperException.NoNodeException.class, () -> 
getTxStatus(zk, txid));
-    } finally {
-      fate.shutdown();
-    }
-  }
-
-  @Test
-  public void testCancelWhileSubmittedAndRunning() throws Exception {
-    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
-    final AgeOffStore<Manager> store =
-        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
-
-    Manager manager = createMock(Manager.class);
-    ServerContext sctx = createMock(ServerContext.class);
-    expect(manager.getContext()).andReturn(sctx).anyTimes();
-    expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
-    expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
-    replay(manager, sctx);
-
-    ConfigurationCopy config = new ConfigurationCopy();
-    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
-    try {
-
-      // Wait for the transaction runner to be scheduled.
-      Thread.sleep(3000);
-
-      callStarted = new CountDownLatch(1);
-      finishCall = new CountDownLatch(1);
-
-      long txid = fate.startTransaction();
-      LOG.debug("Starting test testCancelWhileSubmitted with {}", 
FateTxId.formatTid(txid));
-      assertEquals(NEW, getTxStatus(zk, txid));
-      fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
false, "Test Op");
-      Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid));
-      // This is false because the transaction runner has reserved the FaTe
-      // transaction.
-      assertFalse(fate.cancel(txid));
-      callStarted.await();
-      finishCall.countDown();
-      Wait.waitFor(() -> IN_PROGRESS != getTxStatus(zk, txid));
-      fate.delete(txid);
-      assertThrows(KeeperException.NoNodeException.class, () -> 
getTxStatus(zk, txid));
-    } finally {
-      fate.shutdown();
-    }
-  }
-
-  @Test
-  public void testCancelWhileInCall() throws Exception {
-    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
-    final AgeOffStore<Manager> store =
-        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
-
-    Manager manager = createMock(Manager.class);
-    ServerContext sctx = createMock(ServerContext.class);
-    expect(manager.getContext()).andReturn(sctx).anyTimes();
-    expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
-    expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
-    replay(manager, sctx);
-
-    ConfigurationCopy config = new ConfigurationCopy();
-    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
-    try {
-
-      // Wait for the transaction runner to be scheduled.
-      Thread.sleep(3000);
-
-      callStarted = new CountDownLatch(1);
-      finishCall = new CountDownLatch(1);
-
-      long txid = fate.startTransaction();
-      LOG.debug("Starting test testCancelWhileInCall with {}", 
FateTxId.formatTid(txid));
-      assertEquals(NEW, getTxStatus(zk, txid));
-      fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
-      assertEquals(SUBMITTED, getTxStatus(zk, txid));
-      // wait for call() to be called
-      callStarted.await();
-      // cancel the transaction
-      assertFalse(fate.cancel(txid));
-    } finally {
-      fate.shutdown();
-    }
-
-  }
-
-  private static void inCall() throws InterruptedException {
-    // signal that call started
-    callStarted.countDown();
-    // wait for the signal to exit the method
-    finishCall.await();
-  }
-
-  /*
-   * Get the status of the TX from ZK directly. Unable to call 
ZooStore.getStatus because this test
-   * thread does not have the reservation (the FaTE thread does)
-   */
-  private static TStatus getTxStatus(ZooReaderWriter zrw, long txid)
-      throws KeeperException, InterruptedException {
-    zrw.sync(ZK_ROOT);
-    String txdir = String.format("%s%s/tx_%016x", ZK_ROOT, Constants.ZFATE, 
txid);
-    return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8));
-  }
-
-}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
new file mode 100644
index 0000000000..b9c29e85bb
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.zookeeper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.fate.AgeOffStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.ZooStore;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.fate.FateIT;
+import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
+import org.apache.zookeeper.KeeperException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.io.TempDir;
+
+@Tag(ZOOKEEPER_TESTING_SERVER)
+public class ZookeeperFateIT extends FateIT {
+
+  private static ZooKeeperTestingServer szk = null;
+  private static ZooReaderWriter zk = null;
+  private static final String ZK_ROOT = "/accumulo/" + UUID.randomUUID();
+
+  @TempDir
+  private static File tempDir;
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    szk = new ZooKeeperTestingServer(tempDir);
+    zk = szk.getZooReaderWriter();
+    zk.mkdirs(ZK_ROOT + Constants.ZFATE);
+    zk.mkdirs(ZK_ROOT + Constants.ZTABLE_LOCKS);
+  }
+
+  @AfterAll
+  public static void teardown() throws Exception {
+    szk.close();
+  }
+
+  @Override
+  protected void executeTest(FateTestExecutor testMethod) throws Exception {
+    final ZooStore<TestEnv> zooStore = new ZooStore<>(ZK_ROOT + 
Constants.ZFATE, zk);
+    final AgeOffStore<TestEnv> store = new AgeOffStore<>(zooStore, 3000, 
System::currentTimeMillis);
+
+    ServerContext sctx = createMock(ServerContext.class);
+    expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
+    expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
+    replay(sctx);
+
+    testMethod.execute(store, sctx);
+  }
+
+  @Override
+  protected Class<? extends Exception> getNoTxExistsException() {
+    return KeeperException.NoNodeException.class;
+  }
+
+  @Override
+  protected TStatus getTxStatus(ServerContext sctx, long txid)
+      throws InterruptedException, KeeperException {
+    return getTxStatus(sctx.getZooReaderWriter(), txid);
+  }
+
+  @Override
+  protected boolean verifyRemoved(ServerContext sctx, long txid) {
+    try {
+      getTxStatus(sctx, txid);
+    } catch (KeeperException e) {
+      if (e.code() == KeeperException.Code.NONODE) {
+        return true;
+      } else {
+        fail("Unexpected error thrown: " + e.getMessage());
+      }
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(e);
+    }
+    return false;
+  }
+
+  /*
+   * Get the status of the TX from ZK directly. Unable to call 
ZooStore.getStatus because this test
+   * thread does not have the reservation (the FaTE thread does)
+   */
+  private static TStatus getTxStatus(ZooReaderWriter zrw, long txid)
+      throws KeeperException, InterruptedException {
+    zrw.sync(ZK_ROOT);
+    String txdir = String.format("%s%s/tx_%016x", ZK_ROOT, Constants.ZFATE, 
txid);
+    return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8));
+  }
+
+}

Reply via email to