This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 28d152348d Breaks Fate into Fate and FateClient (#6142)
28d152348d is described below

commit 28d152348d180b22a53b555a4b2f5eb5d2b72693
Author: Keith Turner <[email protected]>
AuthorDate: Tue Feb 24 14:53:30 2026 -0800

    Breaks Fate into Fate and FateClient (#6142)
---
 .../java/org/apache/accumulo/core/fate/Fate.java   | 146 +----------------
 .../org/apache/accumulo/core/fate/FateClient.java  | 179 +++++++++++++++++++++
 .../accumulo/manager/FateServiceHandler.java       |  50 +++---
 .../java/org/apache/accumulo/manager/Manager.java  |  36 +++--
 .../coordinator/CompactionCoordinator.java         |  24 +--
 .../coordinator/DeadCompactionDetector.java        |  20 +--
 .../manager/merge/FindMergeableRangeTask.java      |   2 +-
 .../apache/accumulo/manager/split/Splitter.java    |   2 +-
 .../compaction/CompactionCoordinatorTest.java      |   9 +-
 9 files changed, 249 insertions(+), 219 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index eebe114785..58bdc5cef6 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -19,12 +19,6 @@
 package org.apache.accumulo.core.fate;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
-import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
-import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
-import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.META_DEAD_RESERVATION_CLEANER_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.USER_DEAD_RESERVATION_CLEANER_POOL;
 
@@ -36,7 +30,6 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
@@ -48,18 +41,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.fate.FateStore.FateTxStore;
-import org.apache.accumulo.core.fate.FateStore.Seeder;
-import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.logging.FateLogger;
 import org.apache.accumulo.core.manager.thrift.TFateOperation;
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
-import org.apache.thrift.TApplicationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,16 +60,15 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
  */
 @SuppressFBWarnings(value = "CT_CONSTRUCTOR_THROW",
     justification = "Constructor validation is required for proper 
initialization")
-public class Fate<T> {
+public class Fate<T> extends FateClient<T> {
 
-  private static final Logger log = LoggerFactory.getLogger(Fate.class);
+  static final Logger log = LoggerFactory.getLogger(Fate.class);
 
   private final FateStore<T> store;
   private final ScheduledFuture<?> fatePoolsWatcherFuture;
   private final AtomicInteger needMoreThreadsWarnCount = new AtomicInteger(0);
   private final ExecutorService deadResCleanerExecutor;
 
-  private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
   public static final Duration INITIAL_DELAY = Duration.ofSeconds(3);
   private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofMinutes(3);
   public static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30);
@@ -262,6 +248,7 @@ public class Fate<T> {
   public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
       Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf,
       ScheduledThreadPoolExecutor genSchedExecutor) {
+    super(store, toLogStrFunc);
     this.store = FateLogger.wrap(store, toLogStrFunc, false);
 
     fatePoolsWatcherFuture =
@@ -382,133 +369,6 @@ public class Fate<T> {
     return needMoreThreadsWarnCount;
   }
 
-  // get a transaction id back to the requester before doing any work
-  public FateId startTransaction() {
-    return store.create();
-  }
-
-  public Seeder<T> beginSeeding() {
-    return store.beginSeeding();
-  }
-
-  public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo<T> 
repo,
-      boolean autoCleanUp) {
-    try (var seeder = store.beginSeeding()) {
-      @SuppressWarnings("unused")
-      var unused = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, 
autoCleanUp);
-    }
-  }
-
-  // start work in the transaction.. it is safe to call this
-  // multiple times for a transaction... but it will only seed once
-  public void seedTransaction(FateOperation fateOp, FateId fateId, Repo<T> 
repo,
-      boolean autoCleanUp, String goalMessage) {
-    log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, 
goalMessage);
-    store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
-  }
-
-  // check on the transaction
-  public TStatus waitForCompletion(FateId fateId) {
-    return store.read(fateId).waitForStatusChange(FINISHED_STATES);
-  }
-
-  /**
-   * Attempts to cancel a running Fate transaction
-   *
-   * @param fateId fate transaction id
-   * @return true if transaction transitioned to a failed state or already in 
a completed state,
-   *         false otherwise
-   */
-  public boolean cancel(FateId fateId) {
-    for (int retries = 0; retries < 5; retries++) {
-      Optional<FateTxStore<T>> optionalTxStore = store.tryReserve(fateId);
-      if (optionalTxStore.isPresent()) {
-        var txStore = optionalTxStore.orElseThrow();
-        try {
-          TStatus status = txStore.getStatus();
-          log.info("[{}] status is: {}", store.type(), status);
-          if (status == NEW || status == SUBMITTED) {
-            txStore.setTransactionInfo(TxInfo.EXCEPTION, new 
TApplicationException(
-                TApplicationException.INTERNAL_ERROR, "Fate transaction 
cancelled by user"));
-            txStore.setStatus(FAILED_IN_PROGRESS);
-            log.info(
-                "[{}] Updated status for {} to FAILED_IN_PROGRESS because it 
was cancelled by user",
-                store.type(), fateId);
-            return true;
-          } else {
-            log.info("[{}] {} cancelled by user but already in progress or 
finished state",
-                store.type(), fateId);
-            return false;
-          }
-        } finally {
-          txStore.unreserve(Duration.ZERO);
-        }
-      } else {
-        // reserved, lets retry.
-        UtilWaitThread.sleep(500);
-      }
-    }
-    log.info("[{}] Unable to reserve transaction {} to cancel it", 
store.type(), fateId);
-    return false;
-  }
-
-  // resource cleanup
-  public void delete(FateId fateId) {
-    FateTxStore<T> txStore = store.reserve(fateId);
-    try {
-      switch (txStore.getStatus()) {
-        case NEW:
-        case SUBMITTED:
-        case FAILED:
-        case SUCCESSFUL:
-          txStore.delete();
-          break;
-        case FAILED_IN_PROGRESS:
-        case IN_PROGRESS:
-          throw new IllegalStateException("Can not delete in progress 
transaction " + fateId);
-        case UNKNOWN:
-          // nothing to do, it does not exist
-          break;
-      }
-    } finally {
-      txStore.unreserve(Duration.ZERO);
-    }
-  }
-
-  public String getReturn(FateId fateId) {
-    FateTxStore<T> txStore = store.reserve(fateId);
-    try {
-      if (txStore.getStatus() != SUCCESSFUL) {
-        throw new IllegalStateException(
-            "Tried to get exception when transaction " + fateId + " not in 
successful state");
-      }
-      return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE);
-    } finally {
-      txStore.unreserve(Duration.ZERO);
-    }
-  }
-
-  // get reportable failures
-  public Exception getException(FateId fateId) {
-    FateTxStore<T> txStore = store.reserve(fateId);
-    try {
-      if (txStore.getStatus() != FAILED) {
-        throw new IllegalStateException(
-            "Tried to get exception when transaction " + fateId + " not in 
failed state");
-      }
-      return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION);
-    } finally {
-      txStore.unreserve(Duration.ZERO);
-    }
-  }
-
-  /**
-   * Lists transctions for a given fate key type.
-   */
-  public Stream<FateKey> list(FateKey.FateKeyType type) {
-    return store.list(type);
-  }
-
   /**
    * Initiates shutdown of background threads that run fate operations and 
cleanup fate data and
    * optionally waits on them. Leaves the fate object in a state where it can 
still update and read
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateClient.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateClient.java
new file mode 100644
index 0000000000..2dc472e4bd
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateClient.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
+import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
+
+import java.time.Duration;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.logging.FateLogger;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.thrift.TApplicationException;
+
+/**
+ * Supports initiating and checking status of fate operations.
+ *
+ */
+public class FateClient<T> {
+
+  private final FateStore<T> store;
+
+  private static final EnumSet<ReadOnlyFateStore.TStatus> FINISHED_STATES =
+      EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);
+
+  public FateClient(FateStore<T> store, Function<Repo<T>,String> toLogStrFunc) 
{
+    this.store = FateLogger.wrap(store, toLogStrFunc, false);
+  }
+
+  // get a transaction id back to the requester before doing any work
+  public FateId startTransaction() {
+    return store.create();
+  }
+
+  public FateStore.Seeder<T> beginSeeding() {
+    return store.beginSeeding();
+  }
+
+  public void seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, 
Repo<T> repo,
+      boolean autoCleanUp) {
+    try (var seeder = store.beginSeeding()) {
+      @SuppressWarnings("unused")
+      var unused = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, 
autoCleanUp);
+    }
+  }
+
+  // start work in the transaction.. it is safe to call this
+  // multiple times for a transaction... but it will only seed once
+  public void seedTransaction(Fate.FateOperation fateOp, FateId fateId, 
Repo<T> repo,
+      boolean autoCleanUp, String goalMessage) {
+    Fate.log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, 
goalMessage);
+    store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
+  }
+
+  // check on the transaction
+  public ReadOnlyFateStore.TStatus waitForCompletion(FateId fateId) {
+    return store.read(fateId).waitForStatusChange(FINISHED_STATES);
+  }
+
+  /**
+   * Attempts to cancel a running Fate transaction
+   *
+   * @param fateId fate transaction id
+   * @return true if transaction transitioned to a failed state or already in 
a completed state,
+   *         false otherwise
+   */
+  public boolean cancel(FateId fateId) {
+    for (int retries = 0; retries < 5; retries++) {
+      Optional<FateStore.FateTxStore<T>> optionalTxStore = 
store.tryReserve(fateId);
+      if (optionalTxStore.isPresent()) {
+        var txStore = optionalTxStore.orElseThrow();
+        try {
+          ReadOnlyFateStore.TStatus status = txStore.getStatus();
+          Fate.log.info("[{}] status is: {}", store.type(), status);
+          if (status == NEW || status == SUBMITTED) {
+            txStore.setTransactionInfo(Fate.TxInfo.EXCEPTION, new 
TApplicationException(
+                TApplicationException.INTERNAL_ERROR, "Fate transaction 
cancelled by user"));
+            txStore.setStatus(FAILED_IN_PROGRESS);
+            Fate.log.info(
+                "[{}] Updated status for {} to FAILED_IN_PROGRESS because it 
was cancelled by user",
+                store.type(), fateId);
+            return true;
+          } else {
+            Fate.log.info("[{}] {} cancelled by user but already in progress 
or finished state",
+                store.type(), fateId);
+            return false;
+          }
+        } finally {
+          txStore.unreserve(Duration.ZERO);
+        }
+      } else {
+        // reserved, lets retry.
+        UtilWaitThread.sleep(500);
+      }
+    }
+    Fate.log.info("[{}] Unable to reserve transaction {} to cancel it", 
store.type(), fateId);
+    return false;
+  }
+
+  // resource cleanup
+  public void delete(FateId fateId) {
+    FateStore.FateTxStore<T> txStore = store.reserve(fateId);
+    try {
+      switch (txStore.getStatus()) {
+        case NEW:
+        case SUBMITTED:
+        case FAILED:
+        case SUCCESSFUL:
+          txStore.delete();
+          break;
+        case FAILED_IN_PROGRESS:
+        case IN_PROGRESS:
+          throw new IllegalStateException("Can not delete in progress 
transaction " + fateId);
+        case UNKNOWN:
+          // nothing to do, it does not exist
+          break;
+      }
+    } finally {
+      txStore.unreserve(Duration.ZERO);
+    }
+  }
+
+  public String getReturn(FateId fateId) {
+    FateStore.FateTxStore<T> txStore = store.reserve(fateId);
+    try {
+      if (txStore.getStatus() != SUCCESSFUL) {
+        throw new IllegalStateException(
+            "Tried to get exception when transaction " + fateId + " not in 
successful state");
+      }
+      return (String) txStore.getTransactionInfo(Fate.TxInfo.RETURN_VALUE);
+    } finally {
+      txStore.unreserve(Duration.ZERO);
+    }
+  }
+
+  // get reportable failures
+  public Exception getException(FateId fateId) {
+    FateStore.FateTxStore<T> txStore = store.reserve(fateId);
+    try {
+      if (txStore.getStatus() != FAILED) {
+        throw new IllegalStateException(
+            "Tried to get exception when transaction " + fateId + " not in 
failed state");
+      }
+      return (Exception) txStore.getTransactionInfo(Fate.TxInfo.EXCEPTION);
+    } finally {
+      txStore.unreserve(Duration.ZERO);
+    }
+  }
+
+  /**
+   * Lists transctions for a given fate key type.
+   */
+  public Stream<FateKey> list(FateKey.FateKeyType type) {
+    return store.list(type);
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
index f48ef33e19..f230e17bee 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
@@ -131,7 +131,7 @@ class FateServiceHandler implements FateService.Iface {
       throws ThriftSecurityException {
     authenticate(credentials);
     return new TFateId(type,
-        
manager.fate(FateInstanceType.fromThrift(type)).startTransaction().getTxUUIDStr());
+        
manager.fateClient(FateInstanceType.fromThrift(type)).startTransaction().getTxUUIDStr());
   }
 
   @Override
@@ -156,7 +156,7 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage += "Create " + namespace + " namespace.";
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new CreateNamespace(c.getPrincipal(), namespace, 
options)), autoCleanup,
             goalMessage);
         break;
@@ -175,7 +175,7 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage += "Rename " + oldName + " namespace to " + newName;
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new RenameNamespace(namespaceId, oldName, 
newName)), autoCleanup,
             goalMessage);
         break;
@@ -193,7 +193,7 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage += "Delete namespace Id: " + namespaceId;
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new DeleteNamespace(namespaceId)), autoCleanup, 
goalMessage);
         break;
       }
@@ -251,7 +251,7 @@ class FateServiceHandler implements FateService.Iface {
         goalMessage += "Create table " + tableName + " " + initialTableState + 
" with " + splitCount
             + " splits and initial tabletAvailability of " + 
initialTabletAvailability;
 
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, 
timeType, options,
                 splitsPath, splitCount, splitsDirsPath, initialTableState,
                 // Set the default tablet to be auto-mergeable with other 
tablets if it is split
@@ -287,7 +287,7 @@ class FateServiceHandler implements FateService.Iface {
         goalMessage += "Rename table " + oldTableName + "(" + tableId + ") to 
" + oldTableName;
 
         try {
-          manager.fate(type).seedTransaction(op, fateId,
+          manager.fateClient(type).seedTransaction(op, fateId,
               new TraceRepo<>(new RenameTable(namespaceId, tableId, 
oldTableName, newTableName)),
               autoCleanup, goalMessage);
         } catch (NamespaceNotFoundException e) {
@@ -369,7 +369,7 @@ class FateServiceHandler implements FateService.Iface {
           goalMessage += " and keep offline.";
         }
 
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new CloneTable(c.getPrincipal(), srcNamespaceId, 
srcTableId,
                 namespaceId, tableName, propertiesToSet, propertiesToExclude, 
keepOffline)),
             autoCleanup, goalMessage);
@@ -399,7 +399,7 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage += "Delete table " + tableName + "(" + tableId + ")";
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new PreDeleteTable(namespaceId, tableId)), 
autoCleanup, goalMessage);
         break;
       }
@@ -426,7 +426,7 @@ class FateServiceHandler implements FateService.Iface {
         goalMessage += "Online table " + tableId;
         final EnumSet<TableState> expectedCurrStates =
             EnumSet.of(TableState.ONLINE, TableState.OFFLINE);
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(
                 new ChangeTableState(namespaceId, tableId, tableOp, 
expectedCurrStates)),
             autoCleanup, goalMessage);
@@ -455,7 +455,7 @@ class FateServiceHandler implements FateService.Iface {
         goalMessage += "Offline table " + tableId;
         final EnumSet<TableState> expectedCurrStates =
             EnumSet.of(TableState.ONLINE, TableState.OFFLINE);
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(
                 new ChangeTableState(namespaceId, tableId, tableOp, 
expectedCurrStates)),
             autoCleanup, goalMessage);
@@ -491,7 +491,7 @@ class FateServiceHandler implements FateService.Iface {
             startRowStr, endRowStr);
         goalMessage += "Merge table " + tableName + "(" + tableId + ") splits 
from " + startRowStr
             + " to " + endRowStr;
-        manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(
+        manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(
             new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId, 
startRow, endRow)),
             autoCleanup, goalMessage);
         break;
@@ -523,7 +523,7 @@ class FateServiceHandler implements FateService.Iface {
 
         goalMessage +=
             "Delete table " + tableName + "(" + tableId + ") range " + 
startRow + " to " + endRow;
-        manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(
+        manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(
             new TableRangeOp(MergeInfo.Operation.DELETE, namespaceId, tableId, 
startRow, endRow)),
             autoCleanup, goalMessage);
         break;
@@ -549,7 +549,7 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage += "Compact table (" + tableId + ") with config " + 
compactionConfig;
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new CompactRange(namespaceId, tableId, 
compactionConfig)), autoCleanup,
             goalMessage);
         break;
@@ -573,7 +573,7 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage += "Cancel compaction of table (" + tableId + ")";
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new CancelCompactions(namespaceId, tableId)), 
autoCleanup, goalMessage);
         break;
       }
@@ -608,7 +608,7 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage += "Import table with new name: " + tableName + " from " + 
exportDirs;
-        manager.fate(type)
+        manager.fateClient(type)
             .seedTransaction(op, fateId, new TraceRepo<>(new 
ImportTable(c.getPrincipal(),
                 tableName, exportDirs, namespaceId, keepMappings, 
keepOffline)), autoCleanup,
                 goalMessage);
@@ -638,7 +638,7 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage += "Export table " + tableName + "(" + tableId + ") to " + 
exportDir;
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new ExportTable(namespaceId, tableName, tableId, 
exportDir)),
             autoCleanup, goalMessage);
         break;
@@ -673,7 +673,7 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage += "Bulk import (v2)  " + dir + " to " + tableName + "(" + 
tableId + ")";
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new ComputeBulkRange(tableId, dir, setTime)), 
autoCleanup, goalMessage);
         break;
       }
@@ -717,7 +717,7 @@ class FateServiceHandler implements FateService.Iface {
 
         goalMessage += "Set availability for table: " + tableName + "(" + 
tableId + ") range: "
             + tRange + " to: " + tabletAvailability.name();
-        manager.fate(type).seedTransaction(op, fateId,
+        manager.fateClient(type).seedTransaction(op, fateId,
             new TraceRepo<>(new LockTable(tableId, namespaceId, tRange, 
tabletAvailability)),
             autoCleanup, goalMessage);
         break;
@@ -791,8 +791,8 @@ class FateServiceHandler implements FateService.Iface {
         }
 
         goalMessage = "Splitting " + extent + " for user into " + 
(splits.size() + 1) + " tablets";
-        manager.fate(type).seedTransaction(op, fateId, new PreSplit(extent, 
splits), autoCleanup,
-            goalMessage);
+        manager.fateClient(type).seedTransaction(op, fateId, new 
PreSplit(extent, splits),
+            autoCleanup, goalMessage);
         break;
       }
       default:
@@ -844,9 +844,9 @@ class FateServiceHandler implements FateService.Iface {
 
     FateId fateId = FateId.fromThrift(opid);
     FateInstanceType type = fateId.getType();
-    TStatus status = manager.fate(type).waitForCompletion(fateId);
+    TStatus status = manager.fateClient(type).waitForCompletion(fateId);
     if (status == TStatus.FAILED) {
-      Exception e = manager.fate(type).getException(fateId);
+      Exception e = manager.fateClient(type).getException(fateId);
       if (e instanceof ThriftTableOperationException) {
         throw (ThriftTableOperationException) e;
       } else if (e instanceof ThriftSecurityException) {
@@ -858,7 +858,7 @@ class FateServiceHandler implements FateService.Iface {
       }
     }
 
-    String ret = manager.fate(type).getReturn(fateId);
+    String ret = manager.fateClient(type).getReturn(fateId);
     if (ret == null) {
       ret = ""; // thrift does not like returning null
     }
@@ -870,7 +870,7 @@ class FateServiceHandler implements FateService.Iface {
       throws ThriftSecurityException {
     authenticate(credentials);
     FateId fateId = FateId.fromThrift(opid);
-    manager.fate(fateId.getType()).delete(fateId);
+    manager.fateClient(fateId.getType()).delete(fateId);
   }
 
   protected void authenticate(TCredentials credentials) throws 
ThriftSecurityException {
@@ -984,6 +984,6 @@ class FateServiceHandler implements FateService.Iface {
           SecurityErrorCode.PERMISSION_DENIED);
     }
 
-    return manager.fate(fateId.getType()).cancel(fateId);
+    return manager.fateClient(fateId.getType()).cancel(fateId);
   }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 06f227967e..1a3f9c5443 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -68,6 +68,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.FateCleaner;
+import org.apache.accumulo.core.fate.FateClient;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateStore;
@@ -290,17 +291,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
     return getManagerState() != ManagerState.STOP;
   }
 
-  /**
-   * Retrieve the Fate object, blocking until it is ready. This could cause 
problems if Fate
-   * operations are attempted to be used prior to the Manager being ready for 
them. If these
-   * operations are triggered by a client side request from a tserver or 
client, it should be safe
-   * to wait to handle those until Fate is ready, but if it occurs during an 
upgrade, or some other
-   * time in the Manager before Fate is started, that may result in a deadlock 
and will need to be
-   * fixed.
-   *
-   * @return the Fate object, only after the fate components are running and 
ready
-   */
-  public Fate<FateEnv> fate(FateInstanceType type) {
+  private void waitForFate() {
     try {
       // block up to 30 seconds until it's ready; if it's still not ready, 
introduce some logging
       if (!fateReadyLatch.await(30, SECONDS)) {
@@ -321,7 +312,26 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
       Thread.currentThread().interrupt();
       throw new IllegalStateException("Thread was interrupted; cannot 
proceed");
     }
-    return getFateRefs().get(type);
+  }
+
+  /**
+   * Retrieve the Fate object, blocking until it is ready. This could cause 
problems if Fate
+   * operations are attempted to be used prior to the Manager being ready for 
them. If these
+   * operations are triggered by a client side request from a tserver or 
client, it should be safe
+   * to wait to handle those until Fate is ready, but if it occurs during an 
upgrade, or some other
+   * time in the Manager before Fate is started, that may result in a deadlock 
and will need to be
+   * fixed.
+   *
+   * @return the Fate object, only after the fate components are running and 
ready
+   */
+  public Fate<FateEnv> fate(FateInstanceType type) {
+    waitForFate();
+    var fate = Objects.requireNonNull(fateRefs.get(), "fateRefs is not set 
yet").get(type);
+    return Objects.requireNonNull(fate, () -> "fate type " + type + " is not 
present");
+  }
+
+  public FateClient<FateEnv> fateClient(FateInstanceType type) {
+    return fate(type);
   }
 
   static final boolean X = true;
@@ -924,7 +934,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
     // Start the Manager's Fate Service
     fateServiceHandler = new FateServiceHandler(this);
     managerClientHandler = new ManagerClientServiceHandler(this);
-    compactionCoordinator = new CompactionCoordinator(this, fateRefs);
+    compactionCoordinator = new CompactionCoordinator(this, this::fateClient);
 
     var processor = ThriftProcessorTypes.getManagerTProcessor(this, 
fateServiceHandler,
         compactionCoordinator.getThriftService(), managerClientHandler, 
getContext());
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index e6c0c86f2e..d6915b5155 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -56,8 +56,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -90,6 +90,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateClient;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
@@ -121,7 +122,6 @@ import 
org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.cache.Caches.CacheName;
 import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
 import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
@@ -271,7 +271,7 @@ public class CompactionCoordinator
   private final ServerContext ctx;
   private final AuditedSecurityOperation security;
   private final CompactionJobQueues jobQueues;
-  private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> 
fateInstances;
+  private final Function<FateInstanceType,FateClient<FateEnv>> fateClients;
   // Exposed for tests
   protected final CountDownLatch shutdown = new CountDownLatch(1);
 
@@ -291,7 +291,7 @@ public class CompactionCoordinator
   private final Set<String> activeCompactorReservationRequest = 
ConcurrentHashMap.newKeySet();
 
   public CompactionCoordinator(Manager manager,
-      AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateInstances) {
+      Function<FateInstanceType,FateClient<FateEnv>> fateClients) {
     this.ctx = manager.getContext();
     this.security = ctx.getSecurityOperation();
     this.manager = Objects.requireNonNull(manager);
@@ -303,7 +303,7 @@ public class CompactionCoordinator
 
     this.queueMetrics = new QueueMetrics(jobQueues);
 
-    this.fateInstances = fateInstances;
+    this.fateClients = fateClients;
 
     completed = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
         .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
@@ -326,7 +326,7 @@ public class CompactionCoordinator
         .maximumWeight(10485760L).weigher(weigher).build();
 
     deadCompactionDetector =
-        new DeadCompactionDetector(this.ctx, this, ctx.getScheduledExecutor(), 
fateInstances);
+        new DeadCompactionDetector(this.ctx, this, ctx.getScheduledExecutor(), 
fateClients);
 
     var rootReservationPool = 
ThreadPools.getServerThreadPools().createExecutorService(
         ctx.getConfiguration(), 
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true);
@@ -789,17 +789,9 @@ public class CompactionCoordinator
     }
 
     // maybe fate has not started yet
-    var localFates = fateInstances.get();
-    while (localFates == null) {
-      UtilWaitThread.sleep(100);
-      if (shutdown.getCount() == 0) {
-        return;
-      }
-      localFates = fateInstances.get();
-    }
-
     var extent = KeyExtent.fromThrift(textent);
-    var localFate = 
localFates.get(FateInstanceType.fromTableId(extent.tableId()));
+    var fateType = FateInstanceType.fromTableId(extent.tableId());
+    var localFate = fateClients.apply(fateType);
 
     LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", 
externalCompactionId, stats,
         extent);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index da852f1bb1..ce04296a61 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.manager.compaction.coordinator;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,14 +28,14 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateClient;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -62,16 +63,16 @@ public class DeadCompactionDetector {
   private final ScheduledThreadPoolExecutor schedExecutor;
   private final ConcurrentHashMap<ExternalCompactionId,Long> deadCompactions;
   private final Set<TableId> tablesWithUnreferencedTmpFiles = new HashSet<>();
-  private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> 
fateInstances;
+  private final Function<FateInstanceType,FateClient<FateEnv>> fateClients;
 
   public DeadCompactionDetector(ServerContext context, CompactionCoordinator 
coordinator,
       ScheduledThreadPoolExecutor stpe,
-      AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateInstances) {
+      Function<FateInstanceType,FateClient<FateEnv>> fateClients) {
     this.context = context;
     this.coordinator = coordinator;
     this.schedExecutor = stpe;
     this.deadCompactions = new ConcurrentHashMap<>();
-    this.fateInstances = fateInstances;
+    this.fateClients = fateClients;
   }
 
   public void addTableId(TableId tableWithUnreferencedTmpFiles) {
@@ -196,13 +197,8 @@ public class DeadCompactionDetector {
 
       if (!tabletCompactions.isEmpty()) {
         // look for any compactions committing in fate and remove those
-        var fateMap = fateInstances.get();
-        if (fateMap == null) {
-          log.warn("Fate is not present, can not look for dead compactions");
-          return;
-        }
-        try (Stream<FateKey> keyStream = fateMap.values().stream()
-            .flatMap(fate -> 
fate.list(FateKey.FateKeyType.COMPACTION_COMMIT))) {
+        try (Stream<FateKey> keyStream = 
Arrays.stream(FateInstanceType.values()).map(fateClients)
+            .flatMap(fateClient -> 
fateClient.list(FateKey.FateKeyType.COMPACTION_COMMIT))) {
           keyStream.map(fateKey -> 
fateKey.getCompactionId().orElseThrow()).forEach(ecid -> {
             if (tabletCompactions.remove(ecid) != null) {
               log.debug("Ignoring compaction {} that is committing in a fate", 
ecid);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
index a59bee95ef..11c5b69211 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
@@ -158,7 +158,7 @@ public class FindMergeableRangeTask implements Runnable {
         tableId, startRowStr, endRowStr);
     var fateKey = FateKey.forMerge(new KeyExtent(tableId, range.endRow, 
range.startRow));
 
-    manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateKey,
+    manager.fateClient(type).seedTransaction(FateOperation.SYSTEM_MERGE, 
fateKey,
         new TraceRepo<>(
             new TableRangeOp(Operation.SYSTEM_MERGE, namespaceId, tableId, 
startRow, endRow)),
         true);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
index 1f21fde170..93d4c1cf03 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
@@ -89,7 +89,7 @@ public class Splitter {
 
   private void seedSplits(FateInstanceType instanceType, Map<Text,KeyExtent> 
splits) {
     if (!splits.isEmpty()) {
-      try (var seeder = manager.fate(instanceType).beginSeeding()) {
+      try (var seeder = manager.fateClient(instanceType).beginSeeding()) {
         for (KeyExtent extent : splits.values()) {
           @SuppressWarnings("unused")
           var unused = 
seeder.attemptToSeedTransaction(Fate.FateOperation.SYSTEM_SPLIT,
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index b121faf2b3..0800c42d2a 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -39,7 +39,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
@@ -55,7 +54,6 @@ import org.apache.accumulo.core.data.ResourceGroupId;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
-import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
@@ -79,7 +77,6 @@ import org.apache.accumulo.manager.Manager;
 import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
 import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
 import org.apache.accumulo.manager.compaction.queue.ResolvedCompactionJob;
-import org.apache.accumulo.manager.tableOps.FateEnv;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -94,10 +91,6 @@ import com.google.common.net.HostAndPort;
 
 public class CompactionCoordinatorTest {
 
-  // Need a non-null fateInstances reference for 
CompactionCoordinator.compactionCompleted
-  private static final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> 
fateInstances =
-      new AtomicReference<>(Map.of());
-
   private static final ResourceGroupId GROUP_ID = ResourceGroupId.of("R2DQ");
 
   private final HostAndPort tserverAddr = HostAndPort.fromParts("192.168.1.1", 
9090);
@@ -118,7 +111,7 @@ public class CompactionCoordinatorTest {
     private Set<ExternalCompactionId> metadataCompactionIds = null;
 
     public TestCoordinator(Manager manager, List<RunningCompaction> 
runningCompactions) {
-      super(manager, fateInstances);
+      super(manager, t -> null);
       this.runningCompactions = runningCompactions;
     }
 


Reply via email to