This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new fba07b60a0 Remove meta fate specialization (#6227)
fba07b60a0 is described below

commit fba07b60a0ff7ac77866008187b43a7745f287d3
Author: Keith Turner <[email protected]>
AuthorDate: Thu Apr 2 15:09:49 2026 -0700

    Remove meta fate specialization (#6227)
    
    After this change meta fate and user fate are both treated mostly the
    same in the managers.  One difference is in assignment, the entire meta
    fate range is assigned to a single manager.  User fate is spread across
    all managers.  But both are assigned out by the primary manager using
    the same RPCs now.  The primary manager used to directly start a meta
    fate instance.
    
    Was able to remove the extension of FateEnv from the manager class in
    this change, that caused a ripple of test changes.  But now there are no
    longer two different implementations of FateEnv
---
 .../apache/accumulo/core/fate/FatePartition.java   |  4 +
 .../java/org/apache/accumulo/manager/Manager.java  | 86 +++------------------
 .../apache/accumulo/manager/fate/FateManager.java  | 90 +++++++++++++++-------
 .../apache/accumulo/manager/fate/FateWorker.java   | 83 +++++++++++---------
 .../tableOps/compact/CompactionDriverTest.java     | 14 ++--
 .../manager/tableOps/merge/MergeTabletsTest.java   | 12 +--
 .../manager/tableOps/split/UpdateTabletsTest.java  | 26 +++----
 .../accumulo/test/MultipleManagerFateIT.java       |  5 +-
 .../apache/accumulo/test/ample/TestAmpleUtil.java  | 24 +++---
 .../test/fate/ManagerRepoIT_SimpleSuite.java       | 66 ++++++++--------
 10 files changed, 198 insertions(+), 212 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java
index 33cbbc9724..44907cb788 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java
@@ -60,4 +60,8 @@ public record FatePartition(FateId start, FateId end) {
     }
 
   }
+
+  public FateInstanceType getType() {
+    return start.getType();
+  }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index b1edc70857..204de1d79d 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -50,7 +50,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -77,7 +76,6 @@ import org.apache.accumulo.core.fate.FateCleaner;
 import org.apache.accumulo.core.fate.FateClient;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
-import org.apache.accumulo.core.fate.FatePartition;
 import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.TraceRepo;
 import org.apache.accumulo.core.fate.user.UserFateStore;
@@ -122,11 +120,9 @@ import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
 import org.apache.accumulo.manager.fate.FateManager;
 import org.apache.accumulo.manager.fate.FateWorker;
 import org.apache.accumulo.manager.merge.FindMergeableRangeTask;
-import org.apache.accumulo.manager.metrics.fate.FateExecutorMetricsProducer;
 import org.apache.accumulo.manager.metrics.fate.meta.MetaFateMetrics;
 import org.apache.accumulo.manager.metrics.fate.user.UserFateMetrics;
 import org.apache.accumulo.manager.recovery.RecoveryManager;
-import org.apache.accumulo.manager.split.FileRangeCache;
 import org.apache.accumulo.manager.split.Splitter;
 import org.apache.accumulo.manager.state.TableCounts;
 import org.apache.accumulo.manager.tableOps.FateEnv;
@@ -178,10 +174,8 @@ import io.opentelemetry.context.Scope;
  * <p>
  * The manager will also coordinate log recoveries and reports general status.
  */
-// TODO create standalone PrimaryFateEnv class and pull everything into there 
relatated to
-// FateEnv... this will make it much more clear the env is for metadata ops 
only
 public class Manager extends AbstractServer
-    implements LiveTServerSet.Listener, FateEnv, PrimaryManagerThriftService {
+    implements LiveTServerSet.Listener, PrimaryManagerThriftService {
 
   static final Logger log = LoggerFactory.getLogger(Manager.class);
 
@@ -227,8 +221,6 @@ public class Manager extends AbstractServer
   private final CountDownLatch fateReadyLatch = new CountDownLatch(1);
   private final AtomicReference<Map<FateInstanceType,FateClient<FateEnv>>> 
fateClients =
       new AtomicReference<>();
-  private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateRefs =
-      new AtomicReference<>();
   private volatile FateManager fateManager;
 
   static class TServerStatus {
@@ -286,7 +278,6 @@ public class Manager extends AbstractServer
 
   private final long timeToCacheRecoveryWalExistence;
   private ExecutorService tableInformationStatusPool = null;
-  private ThreadPoolExecutor tabletRefreshThreadPool;
 
   private final TabletStateStore rootTabletStore;
   private final TabletStateStore metadataTabletStore;
@@ -340,21 +331,15 @@ public class Manager extends AbstractServer
   }
 
   /**
-   * Retrieve the Fate object, blocking until it is ready. This could cause 
problems if Fate
+   * Retrieve the FateClient object, blocking until it is ready. This could 
cause problems if Fate
    * operations are attempted to be used prior to the Manager being ready for 
them. If these
    * operations are triggered by a client side request from a tserver or 
client, it should be safe
    * to wait to handle those until Fate is ready, but if it occurs during an 
upgrade, or some other
    * time in the Manager before Fate is started, that may result in a deadlock 
and will need to be
    * fixed.
    *
-   * @return the Fate object, only after the fate components are running and 
ready
+   * @return the FateClient object, only after the fate components are running 
and ready
    */
-  public Fate<FateEnv> fate(FateInstanceType type) {
-    waitForFate();
-    var fate = requireNonNull(fateRefs.get(), "fateRefs is not set 
yet").get(type);
-    return requireNonNull(fate, () -> "fate type " + type + " is not present");
-  }
-
   public FateClient<FateEnv> fateClient(FateInstanceType type) {
     waitForFate();
     var client = requireNonNull(fateClients.get(), "fateClients is not set 
yet").get(type);
@@ -497,16 +482,10 @@ public class Manager extends AbstractServer
     return result;
   }
 
-  @Override
   public TableManager getTableManager() {
     return getContext().getTableManager();
   }
 
-  @Override
-  public ThreadPoolExecutor getTabletRefreshThreadPool() {
-    return tabletRefreshThreadPool;
-  }
-
   public static void main(String[] args) throws Exception {
     AbstractServer.startServer(new Manager(new ServerOpts(), 
ServerContext::new, args), log);
   }
@@ -585,17 +564,11 @@ public class Manager extends AbstractServer
   }
 
   private Splitter splitter;
-  private FileRangeCache fileRangeCache;
 
   public Splitter getSplitter() {
     return splitter;
   }
 
-  @Override
-  public FileRangeCache getFileRangeCache() {
-    return fileRangeCache;
-  }
-
   public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() {
     return upgradeCoordinator.getStatus();
   }
@@ -717,14 +690,14 @@ public class Manager extends AbstractServer
             case CLEAN_STOP:
               switch (getManagerState()) {
                 case NORMAL:
-                  fateManager.stop(Duration.ofMinutes(1));
+                  fateManager.stop(FateInstanceType.USER, 
Duration.ofMinutes(1));
                   setManagerState(ManagerState.SAFE_MODE);
                   break;
                 case SAFE_MODE: {
                   // META fate stores its data in Zookeeper and its operations 
interact with
                   // metadata and root tablets, need to completely shut it 
down before unloading
                   // metadata and root tablets
-                  fate(FateInstanceType.META).shutdown(1, MINUTES);
+                  fateManager.stop(FateInstanceType.META, 
Duration.ofMinutes(1));
                   int count = nonMetaDataTabletsAssignedOrHosted();
                   log.debug(
                       String.format("There are %d non-metadata tablets 
assigned or hosted", count));
@@ -953,9 +926,6 @@ public class Manager extends AbstractServer
         
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
     metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(),
         
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
-    metricsInfo.addMetricsProducers(new 
FateExecutorMetricsProducer(getContext(),
-        fate(FateInstanceType.META).getFateExecutors(),
-        
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
     metricsInfo.addMetricsProducers(this);
   }
 
@@ -1045,11 +1015,6 @@ public class Manager extends AbstractServer
     tableInformationStatusPool = ThreadPools.getServerThreadPools()
         .createExecutorService(getConfiguration(), 
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
 
-    tabletRefreshThreadPool = 
ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ")
-        
.numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS))
-        
.numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS))
-        .build();
-
     Thread statusThread = Threads.createCriticalThread("Status Thread", new 
StatusThread());
     statusThread.start();
 
@@ -1195,7 +1160,6 @@ public class Manager extends AbstractServer
 
     this.splitter = new Splitter(this);
     this.splitter.start();
-    this.fileRangeCache = new FileRangeCache(context);
 
     setupFate(context);
 
@@ -1283,8 +1247,8 @@ public class Manager extends AbstractServer
     }
 
     log.debug("Shutting down fate.");
-    fate(FateInstanceType.META).close();
-    fateManager.stop(Duration.ZERO);
+    fateManager.stop(FateInstanceType.USER, Duration.ZERO);
+    fateManager.stop(FateInstanceType.META, Duration.ZERO);
 
     splitter.stop();
 
@@ -1296,7 +1260,6 @@ public class Manager extends AbstractServer
     }
 
     tableInformationStatusPool.shutdownNow();
-    tabletRefreshThreadPool.shutdownNow();
 
     compactionCoordinator.shutdown();
 
@@ -1344,12 +1307,10 @@ public class Manager extends AbstractServer
           lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
       var metaStore = new MetaFateStore<FateEnv>(context.getZooSession(),
           primaryManagerLock.getLockID(), isLockHeld);
-      var metaInstance = createFateInstance(this, metaStore, context);
-      // configure this instance to process all data
-      
metaInstance.setPartitions(Set.of(FatePartition.all(FateInstanceType.META)));
+      var metaFateClient = new FateClient<>(metaStore, TraceRepo::toLogString);
       var userStore = new UserFateStore<FateEnv>(context, 
SystemTables.FATE.tableName(),
           managerLock.getLockID(), isLockHeld);
-      var userFateClient = new FateClient<FateEnv>(userStore, 
TraceRepo::toLogString);
+      var userFateClient = new FateClient<>(userStore, TraceRepo::toLogString);
 
       var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8), 
this::getSteadyTime);
       ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
@@ -1359,14 +1320,10 @@ public class Manager extends AbstractServer
           .scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES));
 
       if (!fateClients.compareAndSet(null,
-          Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, 
userFateClient))) {
+          Map.of(FateInstanceType.META, metaFateClient, FateInstanceType.USER, 
userFateClient))) {
         throw new IllegalStateException(
             "Unexpected previous fateClient reference map already 
initialized");
       }
-      if (!fateRefs.compareAndSet(null, Map.of(FateInstanceType.META, 
metaInstance))) {
-        throw new IllegalStateException(
-            "Unexpected previous fate reference map already initialized");
-      }
 
       fateReadyLatch.countDown();
     } catch (KeeperException | InterruptedException e) {
@@ -1522,11 +1479,6 @@ public class Manager extends AbstractServer
     }
   }
 
-  @Override
-  public ServiceLock getServiceLock() {
-    return primaryManagerLock;
-  }
-
   private ServiceLockData getPrimaryManagerLock(final ServiceLockPath 
zManagerLoc)
       throws KeeperException, InterruptedException {
     var zooKeeper = getContext().getZooSession();
@@ -1647,7 +1599,6 @@ public class Manager extends AbstractServer
     return result;
   }
 
-  @Override
   public Set<TServerInstance> onlineTabletServers() {
     return tserverSet.getSnapshot().getTservers();
   }
@@ -1660,12 +1611,6 @@ public class Manager extends AbstractServer
     return nextEvent;
   }
 
-  @Override
-  public EventPublisher getEventPublisher() {
-    return nextEvent;
-  }
-
-  @Override
   public VolumeManager getVolumeManager() {
     return getContext().getVolumeManager();
   }
@@ -1732,7 +1677,6 @@ public class Manager extends AbstractServer
    * monotonic clock, which will be approximately consistent between different 
managers or different
    * runs of the same manager. SteadyTime supports both nanoseconds and 
milliseconds.
    */
-  @Override
   public SteadyTime getSteadyTime() {
     return timeKeeper.getTime();
   }
@@ -1760,14 +1704,4 @@ public class Manager extends AbstractServer
   public ServiceLock getLock() {
     return managerLock;
   }
-
-  /**
-   * Get Threads Pool instance which is used by blocked I/O
-   *
-   * @return {@link ExecutorService}
-   */
-  @Override
-  public ExecutorService getRenamePool() {
-    return this.renamePool;
-  }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
index f5724a0f9d..a791da3229 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
@@ -32,7 +32,9 @@ import java.util.stream.Collectors;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FatePartition;
+import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
 import org.apache.accumulo.core.manager.thrift.FateWorkerService;
 import org.apache.accumulo.core.metadata.SystemTables;
@@ -44,6 +46,7 @@ import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.manager.tableOps.FateEnv;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,7 +106,8 @@ public class FateManager {
         }
         Map<HostAndPort,Set<FatePartition>> currentAssignments = new 
HashMap<>();
         currentPartitions.forEach((k, v) -> currentAssignments.put(k, 
v.partitions()));
-        Set<FatePartition> desiredParititions = 
getDesiredPartitions(currentAssignments.size());
+        Map<FateInstanceType,Set<FatePartition>> desiredParititions =
+            getDesiredPartitions(currentAssignments.size());
 
         Map<HostAndPort,Set<FatePartition>> desired =
             computeDesiredAssignments(currentAssignments, desiredParititions);
@@ -188,7 +192,7 @@ public class FateManager {
   @SuppressFBWarnings(value = "SWL_SLEEP_WITH_LOCK_HELD",
       justification = "Sleep is okay. Can hold the lock as long as needed, as 
we are shutting down."
           + " Don't need or want other operations to run.")
-  public synchronized void stop(Duration timeout) {
+  public synchronized void stop(FateInstanceType fateType, Duration timeout) {
     if (!stop.compareAndSet(false, true)) {
       return;
     }
@@ -219,7 +223,9 @@ public class FateManager {
       var currentPartitions = entry.getValue();
       if (!currentPartitions.partitions.isEmpty()) {
         try {
-          setPartitions(hostPort, currentPartitions.updateId(), Set.of());
+          var copy = new HashSet<>(currentPartitions.partitions);
+          copy.removeIf(fp -> fp.getType() == fateType);
+          setPartitions(hostPort, currentPartitions.updateId(), copy);
         } catch (TException e) {
           log.warn("Failed to unassign fate partitions {}", hostPort, e);
         }
@@ -229,8 +235,16 @@ public class FateManager {
     stableAssignments.set(TreeRangeMap.create());
 
     if (!timer.isExpired()) {
-      var store = new UserFateStore<FateEnv>(context, 
SystemTables.FATE.tableName(), null, null);
-
+      FateStore<FateEnv> store = switch (fateType) {
+        case USER -> new UserFateStore<FateEnv>(context, 
SystemTables.FATE.tableName(), null, null);
+        case META -> {
+          try {
+            yield new MetaFateStore<>(context.getZooSession(), null, null);
+          } catch (KeeperException | InterruptedException e) {
+            throw new IllegalStateException(e);
+          }
+        }
+      };
       var reserved = 
store.getActiveReservations(Set.of(FatePartition.all(FateInstanceType.USER)));
       while (!reserved.isEmpty() && !timer.isExpired()) {
         if (log.isTraceEnabled()) {
@@ -333,28 +347,41 @@ public class FateManager {
    */
   private Map<HostAndPort,Set<FatePartition>> computeDesiredAssignments(
       Map<HostAndPort,Set<FatePartition>> currentAssignments,
-      Set<FatePartition> desiredParititions) {
+      Map<FateInstanceType,Set<FatePartition>> desiredParititions) {
 
-    Preconditions.checkArgument(currentAssignments.size() == 
desiredParititions.size());
     Map<HostAndPort,Set<FatePartition>> desiredAssignments = new HashMap<>();
 
-    var copy = new HashSet<>(desiredParititions);
+    currentAssignments.keySet().forEach(hp -> {
+      desiredAssignments.put(hp, new HashSet<>());
+    });
+
+    desiredParititions.forEach((fateType, desiredForType) -> {
+      // This code can not handle more than one partition per host
+      Preconditions.checkState(desiredForType.size() <= 
currentAssignments.size());
+
+      var added = new HashSet<FatePartition>();
+
+      currentAssignments.forEach((hp, partitions) -> {
+        var hostAssignments = desiredAssignments.get(hp);
+        partitions.forEach(partition -> {
+          if (desiredForType.contains(partition)
+              && hostAssignments.stream().noneMatch(fp -> fp.getType() == 
fateType)
+              && !added.contains(partition)) {
+            hostAssignments.add(partition);
+            Preconditions.checkState(added.add(partition));
+          }
+        });
+      });
 
-    currentAssignments.forEach((hp, partitions) -> {
-      if (!partitions.isEmpty()) {
-        var firstPart = partitions.iterator().next();
-        if (copy.contains(firstPart)) {
-          desiredAssignments.put(hp, Set.of(firstPart));
-          copy.remove(firstPart);
+      var iter = Sets.difference(desiredForType, added).iterator();
+      currentAssignments.forEach((hp, partitions) -> {
+        var hostAssignments = desiredAssignments.get(hp);
+        if (iter.hasNext() && hostAssignments.stream().noneMatch(fp -> 
fp.getType() == fateType)) {
+          hostAssignments.add(iter.next());
         }
-      }
-    });
+      });
 
-    var iter = copy.iterator();
-    currentAssignments.forEach((hp, partitions) -> {
-      if (!desiredAssignments.containsKey(hp)) {
-        desiredAssignments.put(hp, Set.of(iter.next()));
-      }
+      Preconditions.checkState(!iter.hasNext());
     });
 
     if (log.isTraceEnabled()) {
@@ -363,7 +390,6 @@ public class FateManager {
         log.trace(" desired {} {} {}", hp, parts.size(), parts);
       });
     }
-
     return desiredAssignments;
   }
 
@@ -371,15 +397,23 @@ public class FateManager {
    * Computes a single partition for each worker such that the partition cover 
all possible UUIDs
    * and evenly divide the UUIDs.
    */
-  private Set<FatePartition> getDesiredPartitions(int numWorkers) {
+  private Map<FateInstanceType,Set<FatePartition>> getDesiredPartitions(int 
numWorkers) {
     Preconditions.checkArgument(numWorkers >= 0);
 
     if (numWorkers == 0) {
-      return Set.of();
+      return Map.of(FateInstanceType.META, Set.of(), FateInstanceType.USER, 
Set.of());
     }
 
     // create a single partition per worker that equally divides the space
-    HashSet<FatePartition> desired = new HashSet<>();
+    Map<FateInstanceType,Set<FatePartition>> desired = new HashMap<>();
+
+    // meta fate will never see much activity, so give it a single partition.
+    desired.put(FateInstanceType.META,
+        Set.of(new FatePartition(FateId.from(FateInstanceType.META, new 
UUID(0, 0)),
+            FateId.from(FateInstanceType.META, new UUID(-1, -1)))));
+
+    Set<FatePartition> desiredUser = new HashSet<>();
+
     // All the shifting is because java does not have unsigned integers. Want 
to evenly partition
     // [0,2^64) into numWorker ranges, but can not directly do that. Work w/ 
60 bit unsigned
     // integers to partition the space and then shift over by 4. Used 60 bits 
instead of 63 so it
@@ -392,7 +426,7 @@ public class FateManager {
       UUID startUuid = new UUID(start, 0);
       UUID endUuid = new UUID(end, 0);
 
-      desired.add(new FatePartition(FateId.from(FateInstanceType.USER, 
startUuid),
+      desiredUser.add(new FatePartition(FateId.from(FateInstanceType.USER, 
startUuid),
           FateId.from(FateInstanceType.USER, endUuid)));
     }
 
@@ -400,9 +434,11 @@ public class FateManager {
     UUID startUuid = new UUID(start, 0);
     // last partition has a special end uuid that is all f nibbles.
     UUID endUuid = new UUID(-1, -1);
-    desired.add(new FatePartition(FateId.from(FateInstanceType.USER, 
startUuid),
+    desiredUser.add(new FatePartition(FateId.from(FateInstanceType.USER, 
startUuid),
         FateId.from(FateInstanceType.USER, endUuid)));
 
+    desired.put(FateInstanceType.USER, desiredUser);
+
     return desired;
   }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java
index a1409af98e..f60fc453c0 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java
@@ -20,8 +20,11 @@ package org.apache.accumulo.manager.fate;
 
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 
+import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -31,9 +34,11 @@ import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FatePartition;
 import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.core.lock.ServiceLock;
 import org.apache.accumulo.core.manager.thrift.FateWorkerService;
@@ -49,6 +54,7 @@ import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,8 +67,7 @@ public class FateWorker implements FateWorkerService.Iface {
   private final AuditedSecurityOperation security;
   private final LiveTServerSet liveTserverSet;
   private final FateFactory fateFactory;
-  private Fate<FateEnv> fate;
-  private FateWorkerEnv fateWorkerEnv;
+  private final Map<FateInstanceType,Fate<FateEnv>> fates = new 
ConcurrentHashMap<>();
 
   public interface FateFactory {
     Fate<FateEnv> create(FateEnv env, FateStore<FateEnv> store, ServerContext 
context);
@@ -71,17 +76,23 @@ public class FateWorker implements FateWorkerService.Iface {
   public FateWorker(ServerContext ctx, LiveTServerSet liveTServerSet, 
FateFactory fateFactory) {
     this.context = ctx;
     this.security = ctx.getSecurityOperation();
-    this.fate = null;
     this.liveTserverSet = liveTServerSet;
     this.fateFactory = fateFactory;
   }
 
   public synchronized void setLock(ServiceLock lock) {
-    fateWorkerEnv = new FateWorkerEnv(context, lock, liveTserverSet);
+    FateWorkerEnv fateWorkerEnv = new FateWorkerEnv(context, lock, 
liveTserverSet);
     Predicate<ZooUtil.LockID> isLockHeld = l -> 
ServiceLock.isLockHeld(context.getZooCache(), l);
+    try {
+      MetaFateStore<FateEnv> metaStore =
+          new MetaFateStore<>(context.getZooSession(), lock.getLockID(), 
isLockHeld);
+      this.fates.put(FateInstanceType.META, fateFactory.create(fateWorkerEnv, 
metaStore, context));
+    } catch (KeeperException | InterruptedException e) {
+      throw new IllegalStateException(e);
+    }
     UserFateStore<FateEnv> store =
         new UserFateStore<>(context, SystemTables.FATE.tableName(), 
lock.getLockID(), isLockHeld);
-    this.fate = fateFactory.create(fateWorkerEnv, store, context);
+    this.fates.put(FateInstanceType.USER, fateFactory.create(fateWorkerEnv, 
store, context));
   }
 
   private Long expectedUpdateId = null;
@@ -105,12 +116,8 @@ public class FateWorker implements FateWorkerService.Iface 
{
       // id
       expectedUpdateId = updateId;
 
-      if (fate == null) {
-        return new TFatePartitions(updateId, List.of());
-      } else {
-        return new TFatePartitions(updateId,
-            
fate.getPartitions().stream().map(FatePartition::toThrift).toList());
-      }
+      return new TFatePartitions(updateId, fates.values().stream()
+          .flatMap(fate -> 
fate.getPartitions().stream()).map(FatePartition::toThrift).toList());
     }
   }
 
@@ -137,16 +144,22 @@ public class FateWorker implements 
FateWorkerService.Iface {
     synchronized (this) {
       // The primary manager should not assign any fate partitions until after 
upgrade is complete.
       Preconditions.checkState(isUpgradeComplete());
-      if (fate != null && expectedUpdateId != null && updateId == 
expectedUpdateId) {
+
+      if (expectedUpdateId != null && updateId == expectedUpdateId) {
         // Set to null which makes it so that an update id can only be used 
once.
         expectedUpdateId = null;
-        var desiredSet = 
desired.stream().map(FatePartition::from).collect(Collectors.toSet());
-        var oldPartitions = fate.setPartitions(desiredSet);
-        log.info("Changed partitions from {} to {}", oldPartitions, 
desiredSet);
+        for (var fateType : FateInstanceType.values()) {
+          var fate = fates.get(fateType);
+          var desiredSet = desired.stream().map(FatePartition::from)
+              .filter(fp -> fp.getType() == 
fateType).collect(Collectors.toSet());
+          var oldPartitions = fate.setPartitions(desiredSet);
+          log.info("Changed partitions for {} from {} to {}", fateType, 
oldPartitions, desiredSet);
+        }
+
         return true;
       } else {
-        log.debug("Did not change partitions to {} expectedUpdateId:{} 
updateId:{} fate==null:{}",
-            desired, expectedUpdateId, updateId, fate == null);
+        log.debug("Did not change partitions to {} expectedUpdateId:{} 
updateId:{} fates:{}",
+            desired, expectedUpdateId, updateId, fates.keySet());
         return false;
       }
     }
@@ -161,28 +174,24 @@ public class FateWorker implements 
FateWorkerService.Iface {
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
 
-    Fate<FateEnv> localFate;
-    synchronized (this) {
-      localFate = fate;
-    }
+    Map<FateInstanceType,Set<FatePartition>> partitions =
+        tpartitions.stream().map(FatePartition::from)
+            .collect(Collectors.groupingBy(FatePartition::getType, 
Collectors.toSet()));
 
-    if (localFate != null) {
-      
localFate.seeded(tpartitions.stream().map(FatePartition::from).collect(Collectors.toSet()));
-    }
-  }
-
-  public synchronized void stop() {
-    fate.shutdown(1, TimeUnit.MINUTES);
-    fate.close();
-    fateWorkerEnv.stop();
-    fate = null;
-    fateWorkerEnv = null;
+    partitions.forEach((fateType, typePartitions) -> {
+      var fate = fates.get(fateType);
+      if (fate != null) {
+        fate.seeded(typePartitions);
+      }
+    });
   }
 
   public synchronized MetricsProducer[] getMetricsProducers() {
-    Preconditions.checkState(fate != null, "Not started yet");
-    return new MetricsProducer[] {
-        new FateExecutorMetricsProducer(context, fate.getFateExecutors(), 
context.getConfiguration()
-            
.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))};
+    Preconditions.checkState(!fates.isEmpty(), "Not started yet");
+    return Arrays.stream(FateInstanceType.values()).map(fates::get)
+        .map(fate -> new FateExecutorMetricsProducer(context, 
fate.getFateExecutors(),
+            context.getConfiguration()
+                
.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)))
+        .toArray(MetricsProducer[]::new);
   }
 }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
index c95dcd9ad6..8d407d763d 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
@@ -38,7 +38,7 @@ import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.zookeeper.ZooSession;
-import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.FateEnv;
 import org.apache.accumulo.manager.tableOps.delete.PreDeleteTable;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.zookeeper.data.Stat;
@@ -74,24 +74,24 @@ public class CompactionDriverTest {
     }
   }
 
-  private Manager manager;
+  private FateEnv fateEnv;
   private ServerContext ctx;
   private ZooSession zk;
 
   @BeforeEach
   public void setup() {
-    manager = createMock(Manager.class);
+    fateEnv = createMock(FateEnv.class);
     ctx = createMock(ServerContext.class);
     zk = createMock(ZooSession.class);
     expect(ctx.getInstanceID()).andReturn(instance).anyTimes();
     expect(ctx.getZooSession()).andReturn(zk).anyTimes();
     expect(zk.asReaderWriter()).andReturn(new ZooReaderWriter(zk)).anyTimes();
-    expect(manager.getContext()).andReturn(ctx).anyTimes();
+    expect(fateEnv.getContext()).andReturn(ctx).anyTimes();
   }
 
   @AfterEach
   public void teardown() {
-    verify(manager, ctx, zk);
+    verify(fateEnv, ctx, zk);
   }
 
   @Test
@@ -107,9 +107,9 @@ public class CompactionDriverTest {
   }
 
   private void runDriver(CompactionDriver driver, String expectedMessage) {
-    replay(manager, ctx, zk);
+    replay(fateEnv, ctx, zk);
     var e = assertThrows(AcceptableThriftTableOperationException.class,
-        () -> driver.isReady(FateId.from(FateInstanceType.USER, 
UUID.randomUUID()), manager));
+        () -> driver.isReady(FateId.from(FateInstanceType.USER, 
UUID.randomUUID()), fateEnv));
     assertEquals(e.getTableId(), tableId.toString());
     assertEquals(e.getOp(), TableOperation.COMPACT);
     assertEquals(e.getType(), TableOperationExceptionType.OTHER);
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
index 9e3037dbe0..7bd6824f2f 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
@@ -84,7 +84,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.time.SteadyTime;
-import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.FateEnv;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.gc.AllVolumesDirectory;
 import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl;
@@ -425,7 +425,7 @@ public class MergeTabletsTest {
             end == null ? null : end.getBytes(UTF_8), 
MergeInfo.Operation.MERGE);
     MergeTablets mergeTablets = new MergeTablets(mergeInfo);
 
-    Manager manager = EasyMock.mock(Manager.class);
+    FateEnv fateEnv = EasyMock.mock(FateEnv.class);
     ServerContext context = EasyMock.mock(ServerContext.class);
     Ample ample = EasyMock.mock(Ample.class);
     TabletsMetadata.Builder tabletBuilder = 
EasyMock.mock(TabletsMetadata.Builder.class);
@@ -438,7 +438,7 @@ public class MergeTabletsTest {
     
EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes();
 
     // setup reading the tablets
-    EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce();
+    EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce();
     EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce();
     EasyMock.expect(ample.readTablets()).andReturn(tabletBuilder).once();
     
EasyMock.expect(tabletBuilder.forTable(tableId)).andReturn(tabletBuilder).once();
@@ -478,12 +478,12 @@ public class MergeTabletsTest {
     ample.putGcFileAndDirCandidates(tableId, dirs);
     EasyMock.expectLastCall().once();
 
-    EasyMock.replay(manager, context, ample, tabletBuilder, tabletsMetadata, 
tabletsMutator,
+    EasyMock.replay(fateEnv, context, ample, tabletBuilder, tabletsMetadata, 
tabletsMutator,
         tabletMutator, cr, managerLock);
 
-    mergeTablets.call(fateId, manager);
+    mergeTablets.call(fateId, fateEnv);
 
-    EasyMock.verify(manager, context, ample, tabletBuilder, tabletsMetadata, 
tabletsMutator,
+    EasyMock.verify(fateEnv, context, ample, tabletBuilder, tabletsMetadata, 
tabletsMutator,
         tabletMutator, cr, managerLock);
   }
 }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
index 6ceac3d681..dad40a0cb1 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
@@ -66,8 +66,8 @@ import 
org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.time.SteadyTime;
-import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.split.FileRangeCache;
+import org.apache.accumulo.manager.tableOps.FateEnv;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl;
 import org.apache.hadoop.fs.Path;
@@ -233,9 +233,9 @@ public class UpdateTabletsTest {
     String dir1 = "dir1";
     String dir2 = "dir2";
 
-    Manager manager = EasyMock.mock(Manager.class);
+    FateEnv fateEnv = EasyMock.mock(FateEnv.class);
     ServerContext context = EasyMock.mock(ServerContext.class);
-    EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce();
+    EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce();
     Ample ample = EasyMock.mock(Ample.class);
     EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce();
     FileRangeCache fileRangeCache = EasyMock.mock(FileRangeCache.class);
@@ -247,8 +247,8 @@ public class UpdateTabletsTest {
         .andReturn(newFileInfo("d", "f"));
     EasyMock.expect(fileRangeCache.getCachedFileInfo(tableId, file4))
         .andReturn(newFileInfo("d", "j"));
-    
EasyMock.expect(manager.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce();
-    
EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100_000, 
TimeUnit.SECONDS))
+    
EasyMock.expect(fateEnv.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce();
+    
EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(100_000, 
TimeUnit.SECONDS))
         .atLeastOnce();
 
     ServiceLock managerLock = EasyMock.mock(ServiceLock.class);
@@ -394,7 +394,7 @@ public class UpdateTabletsTest {
     tabletsMutator.close();
     EasyMock.expectLastCall().anyTimes();
 
-    EasyMock.replay(manager, context, ample, tabletMeta, fileRangeCache, 
tabletsMutator,
+    EasyMock.replay(fateEnv, context, ample, tabletMeta, fileRangeCache, 
tabletsMutator,
         tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions);
     // Now we can actually test the split code that writes the new tablets 
with a bunch columns in
     // the original tablet
@@ -404,9 +404,9 @@ public class UpdateTabletsTest {
     dirNames.add(dir2);
     UpdateTablets updateTablets = new UpdateTablets(
         new SplitInfo(origExtent, 
TabletMergeabilityUtil.systemDefaultSplits(splits)), dirNames);
-    updateTablets.call(fateId, manager);
+    updateTablets.call(fateId, fateEnv);
 
-    EasyMock.verify(manager, context, ample, tabletMeta, fileRangeCache, 
tabletsMutator,
+    EasyMock.verify(fateEnv, context, ample, tabletMeta, fileRangeCache, 
tabletsMutator,
         tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions);
   }
 
@@ -469,15 +469,15 @@ public class UpdateTabletsTest {
 
   private static void testError(KeyExtent origExtent, TabletMetadata tm1, 
FateId fateId)
       throws Exception {
-    Manager manager = EasyMock.mock(Manager.class);
+    FateEnv fateEnv = EasyMock.mock(FateEnv.class);
     ServerContext context = EasyMock.mock(ServerContext.class);
-    EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce();
+    EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce();
     Ample ample = EasyMock.mock(Ample.class);
     EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce();
 
     EasyMock.expect(ample.readTablet(origExtent)).andReturn(tm1);
 
-    EasyMock.replay(manager, context, ample);
+    EasyMock.replay(fateEnv, context, ample);
     // Now we can actually test the split code that writes the new tablets 
with a bunch columns in
     // the original tablet
     SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c")));
@@ -485,8 +485,8 @@ public class UpdateTabletsTest {
     dirNames.add("d1");
     var updateTablets = new UpdateTablets(
         new SplitInfo(origExtent, 
TabletMergeabilityUtil.systemDefaultSplits(splits)), dirNames);
-    updateTablets.call(fateId, manager);
+    updateTablets.call(fateId, fateEnv);
 
-    EasyMock.verify(manager, context, ample);
+    EasyMock.verify(fateEnv, context, ample);
   }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java 
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
index 5e825efbac..dce79c9953 100644
--- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
@@ -84,7 +84,6 @@ import com.google.common.net.HostAndPort;
  *
  */
 public class MultipleManagerFateIT extends ConfigurableMacBase {
-
   // A manager that will quickly clean up fate reservations held by dead 
managers
   public static class FastFateCleanupManager extends Manager {
     protected FastFateCleanupManager(ServerOpts opts, String[] args) throws 
IOException {
@@ -111,6 +110,10 @@ public class MultipleManagerFateIT extends 
ConfigurableMacBase {
     cfg.getClusterServerConfiguration().setNumDefaultCompactors(8);
     // Set this lower so that locks timeout faster
     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+    // This test could kill a manager after its written a compaction to the 
metadata table, but
+    // before it returns it to the compactor via RPC which creates a dead 
compaction. Need to speed
+    // up the dead compaction detection to handle this or else the test will 
hang.
+    
cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, 
"5s");
     cfg.setServerClass(ServerType.MANAGER, r -> FastFateCleanupManager.class);
     super.configure(cfg, hadoopCoreSite);
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java 
b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
index 8b4283f31d..834e9a10bd 100644
--- a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
+++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
@@ -23,29 +23,29 @@ import static 
org.apache.accumulo.test.ample.metadata.TestAmple.testAmpleServerC
 import java.time.Duration;
 
 import org.apache.accumulo.core.util.time.SteadyTime;
-import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.FateEnv;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl;
 import org.easymock.EasyMock;
 
 public class TestAmpleUtil {
 
-  public static Manager mockWithAmple(ServerContext context, 
TestServerAmpleImpl ample) {
-    Manager manager = EasyMock.mock(Manager.class);
-    
EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, 
ample))
+  public static FateEnv mockWithAmple(ServerContext context, 
TestServerAmpleImpl ample) {
+    FateEnv fateEnv = EasyMock.mock(FateEnv.class);
+    
EasyMock.expect(fateEnv.getContext()).andReturn(testAmpleServerContext(context, 
ample))
         .atLeastOnce();
-    EasyMock.replay(manager);
-    return manager;
+    EasyMock.replay(fateEnv);
+    return fateEnv;
   }
 
-  public static Manager mockWithAmple(ServerContext context, 
TestServerAmpleImpl ample,
+  public static FateEnv mockWithAmple(ServerContext context, 
TestServerAmpleImpl ample,
       Duration currentTime) {
-    Manager manager = EasyMock.mock(Manager.class);
-    
EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context, 
ample))
+    FateEnv fateEnv = EasyMock.mock(FateEnv.class);
+    
EasyMock.expect(fateEnv.getContext()).andReturn(testAmpleServerContext(context, 
ample))
         .atLeastOnce();
-    
EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes();
-    EasyMock.replay(manager);
-    return manager;
+    
EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes();
+    EasyMock.replay(fateEnv);
+    return fateEnv;
   }
 
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java
index 1178f2d8ad..a36f870446 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java
@@ -75,9 +75,9 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
-import org.apache.accumulo.manager.Manager;
 import 
org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason;
 import org.apache.accumulo.manager.tableOps.AbstractFateOperation;
+import org.apache.accumulo.manager.tableOps.FateEnv;
 import org.apache.accumulo.manager.tableOps.compact.CompactionDriver;
 import org.apache.accumulo.manager.tableOps.merge.DeleteRows;
 import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
@@ -133,7 +133,7 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
       TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
           .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
       testAmple.createMetadataFromExisting(client, tableId);
-      Manager manager = mockWithAmple(getCluster().getServerContext(), 
testAmple);
+      FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), 
testAmple);
 
       // Create a test operation and fate id for testing merge and delete rows
       // and add operation to test metadata for the tablet
@@ -148,15 +148,15 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
       // Build either MergeTablets or DeleteRows repo for testing no WALs, 
both should check this
       // condition
       final MergeInfo mergeInfo = new MergeInfo(tableId,
-          manager.getContext().getNamespaceId(tableId), null, null, operation);
+          fateEnv.getContext().getNamespaceId(tableId), null, null, operation);
       final AbstractFateOperation repo =
           operation == Operation.MERGE ? new MergeTablets(mergeInfo) : new 
DeleteRows(mergeInfo);
       // Also test ReserveTablets isReady()
       final AbstractFateOperation reserve = new ReserveTablets(mergeInfo);
 
       // First, check no errors with the default case
-      assertEquals(0, reserve.isReady(fateId, manager));
-      assertNotNull(repo.call(fateId, manager));
+      assertEquals(0, reserve.isReady(fateId, fateEnv));
+      assertNotNull(repo.call(fateId, fateEnv));
 
       // Write a WAL to the test metadata and then re-run the repo to check 
for an error
       try (TabletsMutator tm = testAmple.mutateTablets()) {
@@ -165,10 +165,10 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
       }
 
       // Should not be ready due to the presence of a WAL
-      assertTrue(reserve.isReady(fateId, manager) > 0);
+      assertTrue(reserve.isReady(fateId, fateEnv) > 0);
 
       // Repo should throw an exception due to the WAL existence
-      var thrown = assertThrows(IllegalStateException.class, () -> 
repo.call(fateId, manager));
+      var thrown = assertThrows(IllegalStateException.class, () -> 
repo.call(fateId, fateEnv));
       assertTrue(thrown.getMessage().contains("has unexpected walogs"));
     }
   }
@@ -200,57 +200,57 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
       TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
           .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
       testAmple.createMetadataFromExisting(client, tableId);
-      Manager manager =
+      FateEnv fateEnv =
           mockWithAmple(getCluster().getServerContext(), testAmple, 
Duration.ofDays(1));
 
       // Create a test fate id
       var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
 
       // Tablet c is set to never merge
-      MergeInfo mergeInfo = new MergeInfo(tableId, 
manager.getContext().getNamespaceId(tableId),
+      MergeInfo mergeInfo = new MergeInfo(tableId, 
fateEnv.getContext().getNamespaceId(tableId),
           null, new Text("c").getBytes(), Operation.SYSTEM_MERGE);
-      var repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+      var repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv);
       assertInstanceOf(UnreserveSystemMerge.class, repo);
       assertEquals(UnmergeableReason.TABLET_MERGEABILITY,
           ((UnreserveSystemMerge) repo).getReason());
 
       // Tablets a and b are always merge
-      mergeInfo = new MergeInfo(tableId, 
manager.getContext().getNamespaceId(tableId), null,
+      mergeInfo = new MergeInfo(tableId, 
fateEnv.getContext().getNamespaceId(tableId), null,
           new Text("b").getBytes(), Operation.SYSTEM_MERGE);
-      assertInstanceOf(MergeTablets.class, new 
VerifyMergeability(mergeInfo).call(fateId, manager));
+      assertInstanceOf(MergeTablets.class, new 
VerifyMergeability(mergeInfo).call(fateId, fateEnv));
 
-      var context = manager.getContext();
+      var context = fateEnv.getContext();
 
       // split threshold is 10k so default max merge size is 2500 bytes.
       // this adds 6 files of 450 each which puts the tablets over teh 2500 
threshold
       addFileMetadata(context, tableId, null, new Text("c"), 3, 450);
 
       // Data written to the first two tablets totals 2700 bytes and is too 
large
-      repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+      repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv);
       assertInstanceOf(UnreserveSystemMerge.class, repo);
       assertEquals(UnmergeableReason.MAX_TOTAL_SIZE, ((UnreserveSystemMerge) 
repo).getReason());
 
       // Not enough time has passed for Tablet, should be able to merge d and e
-      mergeInfo = new MergeInfo(tableId, 
manager.getContext().getNamespaceId(tableId),
+      mergeInfo = new MergeInfo(tableId, 
fateEnv.getContext().getNamespaceId(tableId),
           new Text("c").getBytes(), new Text("e").getBytes(), 
Operation.SYSTEM_MERGE);
-      repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+      repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv);
       assertInstanceOf(UnreserveSystemMerge.class, repo);
       assertEquals(UnmergeableReason.TABLET_MERGEABILITY,
           ((UnreserveSystemMerge) repo).getReason());
 
       // update time to 3 days so enough time has passed
-      manager = mockWithAmple(getCluster().getServerContext(), testAmple, 
Duration.ofDays(3));
-      assertInstanceOf(MergeTablets.class, new 
VerifyMergeability(mergeInfo).call(fateId, manager));
+      fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple, 
Duration.ofDays(3));
+      assertInstanceOf(MergeTablets.class, new 
VerifyMergeability(mergeInfo).call(fateId, fateEnv));
 
       // last 3 tablets should total 9 files which is < max of 10
-      mergeInfo = new MergeInfo(tableId, 
manager.getContext().getNamespaceId(tableId),
+      mergeInfo = new MergeInfo(tableId, 
fateEnv.getContext().getNamespaceId(tableId),
           new Text("c").getBytes(), null, Operation.SYSTEM_MERGE);
       addFileMetadata(context, tableId, new Text("c"), null, 3, 10);
-      assertInstanceOf(MergeTablets.class, new 
VerifyMergeability(mergeInfo).call(fateId, manager));
+      assertInstanceOf(MergeTablets.class, new 
VerifyMergeability(mergeInfo).call(fateId, fateEnv));
 
       // last 3 tablets should total 12 files which is > max of 10
       addFileMetadata(context, tableId, new Text("c"), null, 4, 10);
-      repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+      repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv);
       assertInstanceOf(UnreserveSystemMerge.class, repo);
       assertEquals(UnmergeableReason.MAX_FILE_COUNT, ((UnreserveSystemMerge) 
repo).getReason());
     }
@@ -306,7 +306,7 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
       testAmple.mutateTablet(extent)
           .putOperation(TabletOperationId.from(TabletOperationType.SPLITTING, 
fateId)).mutate();
 
-      Manager manager = mockWithAmple(getCluster().getServerContext(), 
testAmple);
+      FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), 
testAmple);
 
       assertEquals(opid, testAmple.readTablet(extent).getOperationId());
 
@@ -314,7 +314,7 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
           TabletMergeabilityUtil.systemDefaultSplits(new TreeSet<>(List.of(new 
Text("sp1"))))));
 
       // The repo should delete the opid and throw an exception
-      assertThrows(ThriftTableOperationException.class, () -> 
eoRepo.call(fateId, manager));
+      assertThrows(ThriftTableOperationException.class, () -> 
eoRepo.call(fateId, fateEnv));
 
       // the operation id should have been cleaned up before the exception was 
thrown
       assertNull(testAmple.readTablet(extent).getOperationId());
@@ -347,11 +347,11 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
           not(SplitColumnFamily.UNSPLITTABLE_COLUMN));
 
       KeyExtent extent = new KeyExtent(tableId, null, null);
-      Manager manager = mockWithAmple(getCluster().getServerContext(), 
testAmple);
+      FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), 
testAmple);
 
       FindSplits findSplits = new FindSplits(extent);
       PreSplit preSplit = (PreSplit) findSplits
-          .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), 
manager);
+          .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), 
fateEnv);
 
       // The table should not need splitting
       assertNull(preSplit);
@@ -366,7 +366,7 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
 
       findSplits = new FindSplits(extent);
       preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER, 
UUID.randomUUID()),
-          manager);
+          fateEnv);
 
       // The table SHOULD now need splitting
       assertNotNull(preSplit);
@@ -408,11 +408,11 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
           not(SplitColumnFamily.UNSPLITTABLE_COLUMN));
 
       KeyExtent extent = new KeyExtent(tableId, null, null);
-      Manager manager = mockWithAmple(getCluster().getServerContext(), 
testAmple);
+      FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), 
testAmple);
 
       FindSplits findSplits = new FindSplits(extent);
       PreSplit preSplit = (PreSplit) findSplits
-          .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), 
manager);
+          .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()), 
fateEnv);
 
       // The table should not need splitting
       assertNull(preSplit);
@@ -428,7 +428,7 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
 
       findSplits = new FindSplits(extent);
       preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER, 
UUID.randomUUID()),
-          manager);
+          fateEnv);
 
       // The table SHOULD not need splitting
       assertNull(preSplit);
@@ -462,8 +462,8 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
       TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
           .create(getCluster().getServerContext(), Map.of(DataLevel.USER, 
metadataTable));
       testAmple.createMetadataFromExisting(client, tableId);
-      Manager manager = mockWithAmple(getCluster().getServerContext(), 
testAmple);
-      var ctx = manager.getContext();
+      FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(), 
testAmple);
+      var ctx = fateEnv.getContext();
 
       // Create the CompactionDriver to test with the given range passed into 
the method
       final AbstractFateOperation repo = new 
CompactionDriver(ctx.getNamespaceId(tableId), tableId,
@@ -489,7 +489,7 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
       assertEquals(4, extents.size());
 
       // First call undo using the second fateId and verify there's still 
metadata for the first one
-      repo.undo(fateId2, manager);
+      repo.undo(fateId2, fateEnv);
       try (TabletsMetadata tabletsMetadata = 
testAmple.readTablets().forTable(tableId).build()) {
         tabletsMetadata.forEach(tm -> {
           assertHasCompactionMetadata(fateId1, tm);
@@ -499,7 +499,7 @@ public class ManagerRepoIT_SimpleSuite extends 
SharedMiniClusterBase {
       // Now call undo on the first fateId which would clean up all the 
metadata for all the
       // tablets that overlap with the given range that was provided to the 
CompactionDriver
       // during the creation of the repo
-      repo.undo(fateId1, manager);
+      repo.undo(fateId1, fateEnv);
 
       // First, iterate over only the overlapping tablets and verify that 
those tablets
       // were cleaned up and remove any visited tablets from the extents set

Reply via email to