This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 6407c31147 Makes seeding a fate transaction more efficient (#5122)
6407c31147 is described below

commit 6407c31147423f1eacc861477a9d176e746660b0
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Mon Dec 9 19:21:28 2024 -0500

    Makes seeding a fate transaction more efficient (#5122)
    
    Modified fate to seed fate transaction in single conditional mutation
    instead of multiple.
    
    fixes #5097
    
    
    Co-authored-by: Kevin Rathbun <kevinrr...@gmail.com>
---
 .../accumulo/core/fate/AbstractFateStore.java      |  11 +
 .../java/org/apache/accumulo/core/fate/Fate.java   |  51 +----
 .../org/apache/accumulo/core/fate/FateStore.java   |  42 +++-
 .../accumulo/core/fate/user/FateMutator.java       |  23 ++
 .../accumulo/core/fate/user/FateMutatorImpl.java   |  38 +++-
 .../accumulo/core/fate/user/RowExistsIterator.java |  46 ++++
 .../accumulo/core/fate/user/UserFateStore.java     | 140 ++++--------
 .../core/fate/zookeeper/MetaFateStore.java         |  54 ++++-
 .../apache/accumulo/core/logging/FateLogger.java   |  43 ++--
 .../org/apache/accumulo/core/fate/TestStore.java   |  11 +-
 .../metadata/iterators/SetEncodingIterator.java    |   2 +-
 .../coordinator/CompactionCoordinator.java         |   7 +-
 .../accumulo/manager/split/SeedSplitTask.java      |  16 +-
 .../test/compaction/ExternalCompaction_1_IT.java   |  24 +-
 .../java/org/apache/accumulo/test/fate/FateIT.java |   4 +
 .../org/apache/accumulo/test/fate/FateStoreIT.java | 252 ++++++++++++++++++---
 16 files changed, 529 insertions(+), 235 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index ff5e45d310..3bc322c3c2 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -69,6 +69,11 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
       UUID txUUID = UUID.nameUUIDFromBytes(fateKey.getSerialized());
       return FateId.from(instanceType, txUUID);
     }
+
+    @Override
+    public FateId newRandomId(FateInstanceType instanceType) {
+      return FateId.from(instanceType, UUID.randomUUID());
+    }
   };
 
   // The ZooKeeper lock for the process that's running this store instance
@@ -402,6 +407,12 @@ public abstract class AbstractFateStore<T> implements 
FateStore<T> {
 
   public interface FateIdGenerator {
     FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey);
+
+    FateId newRandomId(FateInstanceType instanceType);
+  }
+
+  protected void seededTx() {
+    unreservedRunnableCount.increment();
   }
 
   protected byte[] serializeTxInfo(Serializable so) {
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index f46cc1aa43..fc405920d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -63,8 +63,6 @@ import org.apache.thrift.TApplicationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Fault tolerant executor
  */
@@ -439,57 +437,16 @@ public class Fate<T> {
     return store.create();
   }
 
-  public Optional<FateId> seedTransaction(String txName, FateKey fateKey, 
Repo<T> repo,
-      boolean autoCleanUp, String goalMessage) {
-
-    Optional<FateTxStore<T>> optTxStore = store.createAndReserve(fateKey);
-
-    return optTxStore.map(txStore -> {
-      var fateId = txStore.getID();
-      try {
-        Preconditions.checkState(txStore.getStatus() == NEW);
-        seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, 
txStore);
-      } finally {
-        txStore.unreserve(Duration.ZERO);
-      }
-      return fateId;
-    });
-  }
-
-  private void seedTransaction(String txName, FateId fateId, Repo<T> repo, 
boolean autoCleanUp,
-      String goalMessage, FateTxStore<T> txStore) {
-    if (txStore.top() == null) {
-      try {
-        log.info("Seeding {} {}", fateId, goalMessage);
-        txStore.push(repo);
-      } catch (StackOverflowException e) {
-        // this should not happen
-        throw new IllegalStateException(e);
-      }
-    }
-
-    if (autoCleanUp) {
-      txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
-    }
-
-    txStore.setTransactionInfo(TxInfo.TX_NAME, txName);
-
-    txStore.setStatus(SUBMITTED);
+  public void seedTransaction(String txName, FateKey fateKey, Repo<T> repo, 
boolean autoCleanUp) {
+    store.seedTransaction(txName, fateKey, repo, autoCleanUp);
   }
 
   // start work in the transaction.. it is safe to call this
   // multiple times for a transaction... but it will only seed once
   public void seedTransaction(String txName, FateId fateId, Repo<T> repo, 
boolean autoCleanUp,
       String goalMessage) {
-    FateTxStore<T> txStore = store.reserve(fateId);
-    try {
-      if (txStore.getStatus() == NEW) {
-        seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, 
txStore);
-      }
-    } finally {
-      txStore.unreserve(Duration.ZERO);
-    }
-
+    log.info("Seeding {} {}", fateId, goalMessage);
+    store.seedTransaction(txName, fateId, repo, autoCleanUp);
   }
 
   // check on the transaction
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index 09ee12dd94..d434770461 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -50,19 +50,41 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
   FateId create();
 
   /**
-   * Creates and reserves a transaction using the given key. If something is 
already running for the
-   * given key, then Optional.empty() will be returned. When this returns a 
non-empty id, it will be
-   * in the new state.
+   * Seeds a transaction with the given repo if it does not exists. A fateId 
will be derived from
+   * the fateKey. If seeded, sets the following data for the fateId in the 
store.
    *
-   * <p>
-   * In the case where a process dies in the middle of a call to this. If 
later, another call is
-   * made with the same key and its in the new state then the FateId for that 
key will be returned.
-   * </p>
+   * <ul>
+   * <li>Set the tx name</li>
+   * <li>Set the status to SUBMITTED</li>
+   * <li>Set the fate key</li>
+   * <li>Sets autocleanup only if true</li>
+   * <li>Sets the creation time</li>
+   * </ul>
    *
-   * @throws IllegalStateException when there is an unexpected collision. This 
can occur if two key
-   *         hash to the same FateId or if a random FateId already exists.
+   * @return The return type is only intended for testing it may not be 
correct in the face of
+   *         failures. When there are no failures returns optional w/ the fate 
id set if seeded and
+   *         empty optional otherwise. If there was a failure this could 
return an empty optional
+   *         when it actually succeeded.
    */
-  Optional<FateTxStore<T>> createAndReserve(FateKey fateKey);
+  Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> 
repo,
+      boolean autoCleanUp);
+
+  /**
+   * Seeds a transaction with the given repo if its current status is NEW and 
it is currently
+   * unreserved. If seeded, sets the following data for the fateId in the 
store.
+   *
+   * <ul>
+   * <li>Set the tx name</li>
+   * <li>Set the status to SUBMITTED</li>
+   * <li>Sets autocleanup only if true</li>
+   * <li>Sets the creation time</li>
+   * </ul>
+   *
+   * @return The return type is only intended for testing it may not be 
correct in the face of
+   *         failures. When there are no failures returns true if seeded and 
false otherwise. If
+   *         there was a failure this could return false when it actually 
succeeded.
+   */
+  boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean 
autoCleanUp);
 
   /**
    * An interface that allows read/write access to the data related to a 
single fate operation.
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
index d199a7463e..ac675d7fb9 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
@@ -33,6 +33,29 @@ public interface FateMutator<T> {
 
   FateMutator<T> putCreateTime(long ctime);
 
+  /**
+   * Requires that nothing exists for this fate mutation.
+   */
+  FateMutator<T> requireAbsent();
+
+  /**
+   * Require that the transaction status is one of the given statuses. If no 
statuses are provided,
+   * require that the status column is absent.
+   *
+   * @param statuses The statuses to check against.
+   */
+  FateMutator<T> requireStatus(TStatus... statuses);
+
+  /**
+   * Require the transaction has no reservation.
+   */
+  FateMutator<T> requireUnreserved();
+
+  /**
+   * Require the transaction has no fate key set.
+   */
+  FateMutator<T> requireAbsentKey();
+
   /**
    * Add a conditional mutation to {@link 
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
    * put the reservation if there is not already a reservation present
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
index 5d99a8df3a..ea7dd85c57 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -48,12 +49,16 @@ import 
org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.io.Text;
 
+import com.google.common.base.Preconditions;
+
 public class FateMutatorImpl<T> implements FateMutator<T> {
 
   private final ClientContext context;
   private final String tableName;
   private final FateId fateId;
   private final ConditionalMutation mutation;
+  private boolean requiredUnreserved = false;
+  public static final int INITIAL_ITERATOR_PRIO = 1000000;
 
   public FateMutatorImpl(ClientContext context, String tableName, FateId 
fateId) {
     this.context = Objects.requireNonNull(context);
@@ -81,10 +86,34 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
   }
 
   @Override
-  public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
+  public FateMutator<T> requireAbsent() {
+    IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, 
RowExistsIterator.class);
+    Condition c = new Condition("", "").setIterators(is);
+    mutation.addCondition(c);
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> requireUnreserved() {
+    Preconditions.checkState(!requiredUnreserved);
     Condition condition = new 
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
         TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
     mutation.addCondition(condition);
+    requiredUnreserved = true;
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> requireAbsentKey() {
+    Condition condition = new 
Condition(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
+        TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
+    mutation.addCondition(condition);
+    return this;
+  }
+
+  @Override
+  public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
+    requireUnreserved();
     TxColumnFamily.RESERVATION_COLUMN.put(mutation, new 
Value(reservation.getSerialized()));
     return this;
   }
@@ -179,12 +208,7 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
     return this;
   }
 
-  /**
-   * Require that the transaction status is one of the given statuses. If no 
statuses are provided,
-   * require that the status column is absent.
-   *
-   * @param statuses The statuses to check against.
-   */
+  @Override
   public FateMutator<T> requireStatus(TStatus... statuses) {
     Condition condition = StatusMappingIterator.createCondition(statuses);
     mutation.addCondition(condition);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java
new file mode 100644
index 0000000000..6095546b0d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.user;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Iterator is used by conditional mutations to check if row exists.
+ */
+public class RowExistsIterator extends WrappingIterator {
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+      throws IOException {
+    Preconditions.checkState(range.getStartKey() != null && range.getEndKey() 
!= null);
+    var startRow = range.getStartKey().getRow();
+    var endRow = range.getEndKey().getRow();
+    Preconditions.checkState(startRow.equals(endRow));
+    Range r = new Range(startRow);
+    super.seek(r, Set.of(), false);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index efd0cbc62f..c134db1840 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.core.fate.user;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.time.Duration;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map.Entry;
@@ -30,6 +29,7 @@ import java.util.SortedMap;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -106,7 +106,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
         UtilWaitThread.sleep(100);
       }
 
-      var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW)
+      var status = newMutator(fateId).requireAbsent().putStatus(TStatus.NEW)
           .putCreateTime(System.currentTimeMillis()).tryMutate();
 
       switch (status) {
@@ -123,104 +123,62 @@ public class UserFateStore<T> extends 
AbstractFateStore<T> {
   }
 
   public FateId getFateId() {
-    return FateId.from(fateInstanceType, UUID.randomUUID());
+    return fateIdGenerator.newRandomId(type());
   }
 
   @Override
-  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
-    final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+  public Optional<FateId> seedTransaction(String txName, FateKey fateKey, 
Repo<T> repo,
+      boolean autoCleanUp) {
     final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
-    Optional<FateTxStore<T>> txStore = Optional.empty();
-    int maxAttempts = 5;
-    FateMutator.Status status = null;
-
-    // Only need to retry if it is UNKNOWN
-    for (int attempt = 0; attempt < maxAttempts; attempt++) {
-      status = 
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
-          
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
-      if (status != FateMutator.Status.UNKNOWN) {
-        break;
-      }
-      UtilWaitThread.sleep(100);
+    Supplier<FateMutator<T>> mutatorFactory = () -> 
newMutator(fateId).requireAbsent()
+        .putKey(fateKey).putCreateTime(System.currentTimeMillis());
+    if (seedTransaction(mutatorFactory, fateKey + " " + fateId, txName, repo, 
autoCleanUp)) {
+      return Optional.of(fateId);
+    } else {
+      return Optional.empty();
     }
+  }
 
-    switch (status) {
-      case ACCEPTED:
-        txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
-        break;
-      case REJECTED:
-        // If the status is REJECTED, we need to check what about the mutation 
was REJECTED:
-        // 1) Possible something like the following occurred:
-        // the first attempt was UNKNOWN but written, the next attempt would 
be rejected
-        // We return the FateTxStore in this case.
-        // 2) If there is a collision with existing fate id, throw error
-        // 3) If the fate id is already reserved, return an empty optional
-        // 4) If the fate id is still NEW/unseeded and unreserved, we can try 
to reserve it
-        try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
-          scanner.setRange(getRow(fateId));
-          scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
-              TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
-          scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
-              TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
-          
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
-              TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
-          TStatus statusSeen = TStatus.UNKNOWN;
-          Optional<FateKey> fateKeySeen = Optional.empty();
-          Optional<FateReservation> reservationSeen = Optional.empty();
-
-          for (Entry<Key,Value> entry : scanner) {
-            Text colf = entry.getKey().getColumnFamily();
-            Text colq = entry.getKey().getColumnQualifier();
-            Value val = entry.getValue();
-
-            switch (colq.toString()) {
-              case TxColumnFamily.STATUS:
-                statusSeen = TStatus.valueOf(val.toString());
-                break;
-              case TxColumnFamily.TX_KEY:
-                fateKeySeen = Optional.of(FateKey.deserialize(val.get()));
-                break;
-              case TxColumnFamily.RESERVATION:
-                reservationSeen = 
Optional.of(FateReservation.deserialize(val.get()));
-                break;
-              default:
-                throw new IllegalStateException("Unexpected column seen: " + 
colf + ":" + colq);
-            }
-          }
+  @Override
+  public boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, 
boolean autoCleanUp) {
+    Supplier<FateMutator<T>> mutatorFactory =
+        () -> 
newMutator(fateId).requireStatus(TStatus.NEW).requireUnreserved().requireAbsentKey();
+    return seedTransaction(mutatorFactory, fateId.canonical(), txName, repo, 
autoCleanUp);
+  }
 
-          if (statusSeen == TStatus.NEW) {
-            verifyFateKey(fateId, fateKeySeen, fateKey);
-            // This will be the case if the mutation status is REJECTED but 
the mutation was written
-            if (reservationSeen.isPresent() && 
reservationSeen.orElseThrow().equals(reservation)) {
-              txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
-            } else if (reservationSeen.isEmpty()) {
-              // NEW/unseeded transaction and not reserved, so we can allow it 
to be reserved
-              // we tryReserve() since another thread may have reserved it 
since the scan
-              txStore = tryReserve(fateId);
-              // the status was known before reserving to be NEW,
-              // however it could change so check after reserving to avoid 
race conditions.
-              var statusAfterReserve =
-                  
txStore.map(ReadOnlyFateTxStore::getStatus).orElse(TStatus.UNKNOWN);
-              if (statusAfterReserve != TStatus.NEW) {
-                txStore.ifPresent(txs -> txs.unreserve(Duration.ZERO));
-                txStore = Optional.empty();
-              }
-            }
-          } else {
-            log.trace(
-                "fate id {} tstatus {} fate key {} is reserved {} "
-                    + "has already been seeded with work (non-NEW status)",
-                fateId, statusSeen, fateKeySeen.orElse(null), 
reservationSeen.isPresent());
-          }
-        } catch (TableNotFoundException e) {
-          throw new IllegalStateException(tableName + " not found!", e);
-        }
-        break;
-      default:
-        throw new IllegalStateException("Unknown or unexpected status " + 
status);
+  private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, 
String logId,
+      String txName, Repo<T> repo, boolean autoCleanUp) {
+    int maxAttempts = 5;
+    for (int attempt = 0; attempt < maxAttempts; attempt++) {
+      var mutator = mutatorFactory.get();
+      mutator =
+          mutator.putName(serializeTxInfo(txName)).putRepo(1, 
repo).putStatus(TStatus.SUBMITTED);
+      if (autoCleanUp) {
+        mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
+      }
+      var status = mutator.tryMutate();
+      if (status == FateMutator.Status.ACCEPTED) {
+        // signal to the super class that a new fate transaction was seeded 
and is ready to run
+        seededTx();
+        log.trace("Attempt to seed {} returned {}", logId, status);
+        return true;
+      } else if (status == FateMutator.Status.REJECTED) {
+        log.debug("Attempt to seed {} returned {}", logId, status);
+        return false;
+      } else if (status == FateMutator.Status.UNKNOWN) {
+        // At this point can not reliably determine if the conditional 
mutation was successful or
+        // not because no reservation was acquired. For example since no 
reservation was acquired it
+        // is possible that seeding was a success and something immediately 
picked it up and started
+        // operating on it and changing it. If scanning after that point can 
not conclude success or
+        // failure. Another situation is that maybe the fateId already existed 
in a seeded form
+        // prior to getting this unknown.
+        log.debug("Attempt to seed {} returned {} status, retrying", logId, 
status);
+        UtilWaitThread.sleep(250);
+      }
     }
 
-    return txStore;
+    log.warn("Repeatedly received unknown status when attempting to seed {}", 
logId);
+    return false;
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
index d6da05e844..28c0904ffa 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
@@ -20,12 +20,15 @@ package org.apache.accumulo.core.fate.zookeeper;
 
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -100,7 +103,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
   public FateId create() {
     while (true) {
       try {
-        FateId fateId = FateId.from(fateInstanceType, UUID.randomUUID());
+        FateId fateId = fateIdGenerator.newRandomId(fateInstanceType);
         zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, 
null).serialize(),
             NodeExistsPolicy.FAIL);
         return fateId;
@@ -112,8 +115,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
     }
   }
 
-  @Override
-  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+  private Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
     final var reservation = FateReservation.from(lockID, UUID.randomUUID());
     final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
 
@@ -161,6 +163,52 @@ public class MetaFateStore<T> extends AbstractFateStore<T> 
{
     }
   }
 
+  @Override
+  public Optional<FateId> seedTransaction(String txName, FateKey fateKey, 
Repo<T> repo,
+      boolean autoCleanUp) {
+    return createAndReserve(fateKey).map(txStore -> {
+      try {
+        seedTransaction(txName, repo, autoCleanUp, txStore);
+        return txStore.getID();
+      } finally {
+        txStore.unreserve(Duration.ZERO);
+      }
+    });
+  }
+
+  @Override
+  public boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, 
boolean autoCleanUp) {
+    return tryReserve(fateId).map(txStore -> {
+      try {
+        if (txStore.getStatus() == NEW) {
+          seedTransaction(txName, repo, autoCleanUp, txStore);
+          return true;
+        }
+        return false;
+      } finally {
+        txStore.unreserve(Duration.ZERO);
+      }
+    }).orElse(false);
+  }
+
+  private void seedTransaction(String txName, Repo<T> repo, boolean 
autoCleanUp,
+      FateTxStore<T> txStore) {
+    if (txStore.top() == null) {
+      try {
+        txStore.push(repo);
+      } catch (StackOverflowException e) {
+        // this should not happen
+        throw new IllegalStateException(e);
+      }
+    }
+
+    if (autoCleanUp) {
+      txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
+    }
+    txStore.setTransactionInfo(TxInfo.TX_NAME, txName);
+    txStore.setStatus(SUBMITTED);
+  }
+
   @Override
   public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
     // uniquely identify this attempt to reserve the fate operation data
diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index 4a9f2517c0..9a5984f4ed 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -149,6 +149,33 @@ public class FateLogger {
         return fateId;
       }
 
+      @Override
+      public Optional<FateId> seedTransaction(String txName, FateKey fateKey, 
Repo<T> repo,
+          boolean autoCleanUp) {
+        var optional = store.seedTransaction(txName, fateKey, repo, 
autoCleanUp);
+        if (storeLog.isTraceEnabled()) {
+          optional.ifPresentOrElse(fateId -> {
+            storeLog.trace("{} seeded {} {} {}", fateId, fateKey, 
toLogString.apply(repo),
+                autoCleanUp);
+          }, () -> {
+            storeLog.trace("Possibly unable to seed {} {} {}", fateKey, 
toLogString.apply(repo),
+                autoCleanUp);
+          });
+        }
+        return optional;
+      }
+
+      @Override
+      public boolean seedTransaction(String txName, FateId fateId, Repo<T> 
repo,
+          boolean autoCleanUp) {
+        boolean seeded = store.seedTransaction(txName, fateId, repo, 
autoCleanUp);
+        if (storeLog.isTraceEnabled()) {
+          storeLog.trace("{} {} {} {}", fateId, seeded ? "seeded" : "unable to 
seed",
+              toLogString.apply(repo), autoCleanUp);
+        }
+        return seeded;
+      }
+
       @Override
       public int getDeferredCount() {
         return store.getDeferredCount();
@@ -164,22 +191,6 @@ public class FateLogger {
         return store.isDeferredOverflow();
       }
 
-      @Override
-      public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
-        Optional<FateTxStore<T>> txStore = store.createAndReserve(fateKey);
-        if (storeLog.isTraceEnabled()) {
-          if (txStore.isPresent()) {
-            storeLog.trace("{} created and reserved fate transaction using key 
: {}",
-                txStore.orElseThrow().getID(), fateKey);
-          } else {
-            storeLog.trace(
-                "fate transaction was not created using key : {}, existing 
transaction exists",
-                fateKey);
-          }
-        }
-        return txStore;
-      }
-
       @Override
       public Map<FateId,FateReservation> getActiveReservations() {
         return store.getActiveReservations();
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java 
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 2c54464663..40d0d755b1 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -53,8 +53,15 @@ public class TestStore implements FateStore<String> {
   }
 
   @Override
-  public Optional<FateTxStore<String>> createAndReserve(FateKey key) {
-    throw new UnsupportedOperationException();
+  public Optional<FateId> seedTransaction(String txName, FateKey fateKey, 
Repo<String> repo,
+      boolean autoCleanUp) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean seedTransaction(String txName, FateId fateId, Repo<String> 
repo,
+      boolean autoCleanUp) {
+    return false;
   }
 
   @Override
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
index b0456afd32..ebe732049f 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
@@ -79,7 +79,7 @@ public class SetEncodingIterator implements 
SortedKeyValueIterator<Key,Value> {
     // expecting this range to cover a single metadata row, so validate the 
range meets expectations
     MetadataSchema.TabletsSection.validateRow(row);
     Preconditions.checkArgument(row.equals(range.getEndKey().getRow()));
-    return range.getStartKey().getRow();
+    return row;
   }
 
   @Override
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index e06229c21b..772676a491 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -747,11 +747,8 @@ public class CompactionCoordinator
     // Start a fate transaction to commit the compaction.
     CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid);
     var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, 
extent, ecm, stats));
-    var txid = localFate.seedTransaction("COMMIT_COMPACTION", 
FateKey.forCompactionCommit(ecid),
-        renameOp, true, "Commit compaction " + ecid);
-
-    txid.ifPresentOrElse(fateId -> LOG.debug("initiated compaction commit {} 
{}", ecid, fateId),
-        () -> LOG.debug("compaction commit already initiated for {}", ecid));
+    localFate.seedTransaction("COMMIT_COMPACTION", 
FateKey.forCompactionCommit(ecid), renameOp,
+        true);
   }
 
   @Override
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
index 78f08a9471..7b56ea4388 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
@@ -18,10 +18,7 @@
  */
 package org.apache.accumulo.manager.split;
 
-import java.util.Optional;
-
 import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.manager.Manager;
@@ -44,17 +41,8 @@ public class SeedSplitTask implements Runnable {
   public void run() {
     try {
       var fateInstanceType = FateInstanceType.fromTableId((extent.tableId()));
-
-      Optional<FateId> optFateId =
-          manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", 
FateKey.forSplit(extent),
-              new FindSplits(extent), true, "System initiated split of tablet 
" + extent);
-
-      optFateId.ifPresentOrElse(fateId -> {
-        log.trace("System initiated a split for : {} {}", extent, fateId);
-      }, () -> {
-        log.trace("System attempted to initiate a split but one was in 
progress : {}", extent);
-      });
-
+      manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", 
FateKey.forSplit(extent),
+          new FindSplits(extent), true);
     } catch (Exception e) {
       log.error("Failed to split {}", extent, e);
     }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index e8955e465a..c8905cd850 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -78,6 +78,7 @@ import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.fate.user.UserFateStore;
 import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.iterators.DevNull;
@@ -96,6 +97,7 @@ import 
org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.util.FindCompactionTmpFiles;
@@ -332,6 +334,21 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
     }
   }
 
+  public static class FakeRepo extends ManagerRepo {
+
+    private static final long serialVersionUID = 1234L;
+
+    @Override
+    public long isReady(FateId fateId, Manager environment) throws Exception {
+      return 1000;
+    }
+
+    @Override
+    public Repo<Manager> call(FateId fateId, Manager environment) throws 
Exception {
+      return null;
+    }
+  }
+
   private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c,
       FateStore<Manager> fateStore, String tableName,
       Map<TableId,List<ExternalCompactionId>> allCids) throws Exception {
@@ -345,10 +362,9 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
     // Create a fate transaction for one of the compaction ids that is in the 
new state, it
     // should never run. Its purpose is to prevent the dead compaction detector
     // from deleting the id.
-    FateStore.FateTxStore<Manager> fateTx = fateStore
-        
.createAndReserve(FateKey.forCompactionCommit(allCids.get(tableId).get(0))).orElseThrow();
-    var fateId = fateTx.getID();
-    fateTx.unreserve(Duration.ZERO);
+    Repo<Manager> repo = new FakeRepo();
+    var fateId = fateStore.seedTransaction("COMPACTION_COMMIT",
+        FateKey.forCompactionCommit(allCids.get(tableId).get(0)), repo, 
true).orElseThrow();
 
     // Read the tablet metadata
     var tabletsMeta = 
ctx.getAmple().readTablets().forTable(tableId).build().stream()
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index e8a77bd330..f493292368 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -77,6 +77,10 @@ public abstract class FateIT extends SharedMiniClusterBase 
implements FateTestRu
 
     private final String data;
 
+    public TestRepo() {
+      this("test");
+    }
+
     public TestRepo(String data) {
       this.data = data;
     }
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
index 64607cab7b..1980dcf4ee 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
@@ -25,10 +25,10 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,6 +40,7 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.data.TableId;
@@ -47,6 +48,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.AbstractFateStore;
 import org.apache.accumulo.core.fate.Fate.TxInfo;
 import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.FateStore.FateTxStore;
@@ -296,18 +298,20 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     FateKey fateKey2 =
         
FateKey.forCompactionCommit(ExternalCompactionId.generate(UUID.randomUUID()));
 
-    FateTxStore<TestEnv> txStore1 = 
store.createAndReserve(fateKey1).orElseThrow();
-    FateTxStore<TestEnv> txStore2 = 
store.createAndReserve(fateKey2).orElseThrow();
+    var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(), 
true).orElseThrow();
+    var fateId2 = store.seedTransaction("TEST", fateKey2, new TestRepo(), 
true).orElseThrow();
 
-    assertNotEquals(txStore1.getID(), txStore2.getID());
+    assertNotEquals(fateId1, fateId2);
 
+    var txStore1 = store.reserve(fateId1);
+    var txStore2 = store.reserve(fateId2);
     try {
       assertTrue(txStore1.timeCreated() > 0);
-      assertEquals(TStatus.NEW, txStore1.getStatus());
+      assertEquals(TStatus.SUBMITTED, txStore1.getStatus());
       assertEquals(fateKey1, txStore1.getKey().orElseThrow());
 
       assertTrue(txStore2.timeCreated() > 0);
-      assertEquals(TStatus.NEW, txStore2.getStatus());
+      assertEquals(TStatus.SUBMITTED, txStore2.getStatus());
       assertEquals(fateKey2, txStore2.getKey().orElseThrow());
 
       assertEquals(2, store.list().count());
@@ -328,18 +332,17 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     KeyExtent ke =
         new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new 
Text("aaa"));
 
-    // Creating with the same key should be fine if the status is NEW
-    // A second call to createAndReserve() should just return an empty optional
-    // since it's already in reserved and in progress
     FateKey fateKey = FateKey.forSplit(ke);
-    FateTxStore<TestEnv> txStore = 
store.createAndReserve(fateKey).orElseThrow();
+    var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), 
true).orElseThrow();
 
     // second call is empty
-    assertTrue(store.createAndReserve(fateKey).isEmpty());
+    assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(), 
true).isEmpty());
+    assertFalse(store.seedTransaction("TEST", fateId, new TestRepo(), true));
 
+    var txStore = store.reserve(fateId);
     try {
       assertTrue(txStore.timeCreated() > 0);
-      assertEquals(TStatus.NEW, txStore.getStatus());
+      assertEquals(TStatus.SUBMITTED, txStore.getStatus());
       assertEquals(fateKey, txStore.getKey().orElseThrow());
       assertEquals(1, store.list().count());
     } finally {
@@ -359,15 +362,16 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
         new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new 
Text("aaa"));
     FateKey fateKey = FateKey.forSplit(ke);
 
-    FateTxStore<TestEnv> txStore = 
store.createAndReserve(fateKey).orElseThrow();
+    var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), 
true).orElseThrow();
 
+    var txStore = store.reserve(fateId);
     try {
       assertTrue(txStore.timeCreated() > 0);
       txStore.setStatus(TStatus.IN_PROGRESS);
 
       // We have an existing transaction with the same key in progress
       // so should return an empty Optional
-      assertTrue(store.createAndReserve(fateKey).isEmpty());
+      assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(), 
true).isEmpty());
       assertEquals(TStatus.IN_PROGRESS, txStore.getStatus());
     } finally {
       txStore.setStatus(TStatus.SUCCESSFUL);
@@ -375,14 +379,20 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
       txStore.unreserve(Duration.ZERO);
     }
 
+    txStore = null;
+
     try {
       // After deletion, make sure we can create again with the same key
-      txStore = store.createAndReserve(fateKey).orElseThrow();
+      var fateId2 = store.seedTransaction("TEST", fateKey, new TestRepo(), 
true).orElseThrow();
+      txStore = store.reserve(fateId);
+      assertEquals(fateId, fateId2);
       assertTrue(txStore.timeCreated() > 0);
-      assertEquals(TStatus.NEW, txStore.getStatus());
+      assertEquals(TStatus.SUBMITTED, txStore.getStatus());
     } finally {
-      txStore.delete();
-      txStore.unreserve(Duration.ZERO);
+      if (txStore != null) {
+        txStore.delete();
+        txStore.unreserve(Duration.ZERO);
+      }
     }
 
   }
@@ -392,8 +402,18 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     // Replace the default hashing algorithm with one that always returns the 
same tid so
     // we can check duplicate detection with different keys
     executeTest(this::testCreateWithKeyCollision, 
AbstractFateStore.DEFAULT_MAX_DEFERRED,
-        (instanceType, fateKey) -> FateId.from(instanceType,
-            UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8))));
+        new AbstractFateStore.FateIdGenerator() {
+          @Override
+          public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey 
fateKey) {
+            return FateId.from(instanceType,
+                UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8)));
+          }
+
+          @Override
+          public FateId newRandomId(FateInstanceType instanceType) {
+            return FateId.from(instanceType, UUID.randomUUID());
+          }
+        });
   }
 
   protected void testCreateWithKeyCollision(FateStore<TestEnv> store, 
ServerContext sctx) {
@@ -404,13 +424,10 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     FateKey fateKey1 = FateKey.forSplit(ke1);
     FateKey fateKey2 = FateKey.forSplit(ke2);
 
-    FateTxStore<TestEnv> txStore = 
store.createAndReserve(fateKey1).orElseThrow();
+    var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(), 
true).orElseThrow();
+    var txStore = store.reserve(fateId1);
     try {
-      var e = assertThrows(IllegalStateException.class, () -> 
store.createAndReserve(fateKey2));
-      assertEquals(
-          "Collision detected for fate id "
-              + FateId.from(store.type(), UUID.nameUUIDFromBytes("testing 
uuid".getBytes(UTF_8))),
-          e.getMessage());
+      assertTrue(store.seedTransaction("TEST", fateKey2, new TestRepo(), 
true).isEmpty());
       assertEquals(fateKey1, txStore.getKey().orElseThrow());
     } finally {
       txStore.delete();
@@ -430,26 +447,192 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
         new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new 
Text("aaa"));
 
     FateKey fateKey = FateKey.forSplit(ke);
-    FateTxStore<TestEnv> txStore = 
store.createAndReserve(fateKey).orElseThrow();
-    FateId fateId = txStore.getID();
+    var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), 
true).orElseThrow();
 
-    // After createAndReserve a fate transaction using a key we can simulate a 
collision with
-    // a random FateId by deleting the key out of Fate and calling 
createAndReserve again to
+    // After seeding a fate transaction using a key we can simulate a 
collision with
+    // a random FateId by deleting the key out of Fate and calling seed again 
to
     // verify it detects the key is missing. Then we can continue and see if 
we can still use
     // the existing transaction.
     deleteKey(fateId, sctx);
-    var e = assertThrows(IllegalStateException.class, () -> 
store.createAndReserve(fateKey));
-    assertEquals("fate key is missing from fate id " + fateId, e.getMessage());
+    assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(), 
true).isEmpty());
 
+    var txStore = store.reserve(fateId);
     // We should still be able to use the existing transaction
     try {
       assertTrue(txStore.timeCreated() > 0);
+      assertEquals(TStatus.SUBMITTED, txStore.getStatus());
+    } finally {
+      txStore.delete();
+      txStore.unreserve(Duration.ZERO);
+    }
+  }
+
+  public static final UUID DUPLICATE_UUID = UUID.randomUUID();
+
+  public static final List<UUID> UUIDS = List.of(DUPLICATE_UUID, 
DUPLICATE_UUID, UUID.randomUUID());
+
+  @Test
+  public void testCreate() throws Exception {
+    AtomicInteger index = new AtomicInteger(0);
+    executeTest(this::testCreate, AbstractFateStore.DEFAULT_MAX_DEFERRED,
+        new AbstractFateStore.FateIdGenerator() {
+          @Override
+          public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey 
fateKey) {
+            return FateId.from(instanceType,
+                UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8)));
+          }
+
+          @Override
+          public FateId newRandomId(FateInstanceType instanceType) {
+            return FateId.from(instanceType, UUIDS.get(index.getAndIncrement() 
% UUIDS.size()));
+          }
+        });
+  }
+
+  protected void testCreate(FateStore<TestEnv> store, ServerContext sctx) 
throws Exception {
+
+    var fateId1 = store.create();
+    assertEquals(UUIDS.get(0), fateId1.getTxUUID());
+
+    // This UUIDS[1] should collide with UUIDS[0] and then the code should 
retry and end up UUIDS[2]
+    var fateId2 = store.create();
+    assertEquals(UUIDS.get(2), fateId2.getTxUUID());
+
+    for (var fateId : List.of(fateId1, fateId2)) {
+      var txStore = store.reserve(fateId);
+      try {
+        assertEquals(TStatus.NEW, txStore.getStatus());
+        assertTrue(txStore.timeCreated() > 0);
+        assertNull(txStore.top());
+        assertTrue(txStore.getKey().isEmpty());
+        assertEquals(fateId, txStore.getID());
+        assertTrue(txStore.getStack().isEmpty());
+      } finally {
+        txStore.unreserve(Duration.ZERO);
+      }
+    }
+
+    assertEquals(Set.of(fateId1, fateId2),
+        store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+    var txStore = store.reserve(fateId2);
+    try {
+      txStore.delete();
+    } finally {
+      txStore.unreserve(Duration.ZERO);
+    }
+
+    assertEquals(Set.of(fateId1),
+        store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+    txStore = store.reserve(fateId1);
+    try {
+      txStore.setStatus(TStatus.SUBMITTED);
+      txStore.setStatus(TStatus.IN_PROGRESS);
+      txStore.push(new TestRepo());
+    } finally {
+      txStore.unreserve(Duration.ZERO);
+    }
+
+    assertEquals(Set.of(fateId1),
+        store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+    // should collide again with the first fate id and go to the second
+    fateId2 = store.create();
+    assertEquals(UUIDS.get(2), fateId2.getTxUUID());
+
+    assertEquals(Set.of(fateId1, fateId2),
+        store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+    // ensure fateId1 was not altered in anyway by creating fateid2 when it 
collided
+    txStore = store.reserve(fateId1);
+    try {
+      assertEquals(TStatus.IN_PROGRESS, txStore.getStatus());
+      assertNotNull(txStore.top());
+      txStore.forceDelete();
+    } finally {
+      txStore.unreserve(Duration.ZERO);
+    }
+
+    assertEquals(Set.of(fateId2),
+        store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+    txStore = store.reserve(fateId2);
+    try {
       assertEquals(TStatus.NEW, txStore.getStatus());
+      txStore.delete();
     } finally {
+      txStore.unreserve(Duration.ZERO);
+    }
+
+    // should be able to recreate something at the same id
+    fateId1 = store.create();
+    assertEquals(UUIDS.get(0), fateId1.getTxUUID());
+    txStore = store.reserve(fateId1);
+    try {
+      assertEquals(TStatus.NEW, txStore.getStatus());
+      assertTrue(txStore.timeCreated() > 0);
+      assertNull(txStore.top());
+      assertTrue(txStore.getKey().isEmpty());
+      assertEquals(fateId1, txStore.getID());
+      assertTrue(txStore.getStack().isEmpty());
       txStore.delete();
+    } finally {
       txStore.unreserve(Duration.ZERO);
     }
 
+    assertEquals(Set.of(), 
store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+  }
+
+  @Test
+  public void testConcurrent() throws Exception {
+    executeTest(this::testConcurrent);
+  }
+
+  protected void testConcurrent(FateStore<TestEnv> store, ServerContext sctx) 
throws Exception {
+    KeyExtent ke =
+        new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new 
Text("aaa"));
+    FateKey fateKey = FateKey.forSplit(ke);
+
+    var executor = Executors.newFixedThreadPool(10);
+    try {
+      // have 10 threads all try to seed the same fate key, only one should 
succeed.
+      List<Future<Optional<FateId>>> futures = new ArrayList<>(10);
+      for (int i = 0; i < 10; i++) {
+        futures.add(
+            executor.submit(() -> store.seedTransaction("TEST", fateKey, new 
TestRepo(), true)));
+      }
+
+      int idsSeen = 0;
+      for (var future : futures) {
+        if (future.get().isPresent()) {
+          idsSeen++;
+        }
+      }
+
+      assertEquals(1, idsSeen);
+      assertEquals(1, store.list(FateKey.FateKeyType.SPLIT).count());
+      assertEquals(0, 
store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count());
+
+      for (var future : futures) {
+        if (future.get().isPresent()) {
+          var txStore = store.reserve(future.get().orElseThrow());
+          try {
+            txStore.delete();
+          } finally {
+            txStore.unreserve(Duration.ZERO);
+          }
+        }
+      }
+
+      assertEquals(0, store.list(FateKey.FateKeyType.SPLIT).count());
+      assertEquals(0, 
store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count());
+
+    } finally {
+      executor.shutdown();
+    }
+
   }
 
   @Test
@@ -500,9 +683,8 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
 
     Map<FateKey,FateId> fateKeyIds = new HashMap<>();
     for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) {
-      var fateTx = store.createAndReserve(fateKey).orElseThrow();
-      fateKeyIds.put(fateKey, fateTx.getID());
-      fateTx.unreserve(Duration.ZERO);
+      var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), 
true).orElseThrow();
+      fateKeyIds.put(fateKey, fateId);
     }
 
     HashSet<FateId> allIds = new HashSet<>();

Reply via email to