This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new af6a8ba431 Makes FateManager more robust (#6292)
af6a8ba431 is described below

commit af6a8ba431d6876945c98459678cf845dcffd5fb
Author: Keith Turner <[email protected]>
AuthorDate: Thu Apr 2 13:20:36 2026 -0700

    Makes FateManager more robust (#6292)
    
    If the FateManager had a connection error talking to another manager it
    could kill the primary manager process. Changed the code to tolerate
    errors and keep trying.
---
 .../apache/accumulo/manager/fate/FateManager.java  | 146 ++++++++++-----------
 1 file changed, 73 insertions(+), 73 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
index 6b294e0b8a..f5724a0f9d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
@@ -85,84 +85,87 @@ public class FateManager {
 
   private final Map<HostAndPort,Set<FatePartition>> pendingNotifications = new 
HashMap<>();
 
-  private void manageAssistants() throws TException, InterruptedException {
+  private void manageAssistants() {
     log.debug("Started Fate Manager");
     long stableCount = 0;
     outer: while (!stop.get()) {
-
-      long sleepTime = Math.min(stableCount * 100, 5_000);
-      Thread.sleep(sleepTime);
-
-      // This map will contain all current workers even if their partitions 
are empty
-      Map<HostAndPort,CurrentPartitions> currentPartitions;
       try {
-        currentPartitions = getCurrentAssignments(context);
-      } catch (TException e) {
-        log.warn("Failed to get current partitions ", e);
-        continue;
-      }
-      Map<HostAndPort,Set<FatePartition>> currentAssignments = new HashMap<>();
-      currentPartitions.forEach((k, v) -> currentAssignments.put(k, 
v.partitions()));
-      Set<FatePartition> desiredParititions = 
getDesiredPartitions(currentAssignments.size());
-
-      Map<HostAndPort,Set<FatePartition>> desired =
-          computeDesiredAssignments(currentAssignments, desiredParititions);
-
-      if (desired.equals(currentAssignments)) {
-        RangeMap<FateId,FateHostPartition> rangeMap = TreeRangeMap.create();
-        currentAssignments.forEach((hostAndPort, partitions) -> {
-          partitions.forEach(partition -> {
-            rangeMap.put(Range.closed(partition.start(), partition.end()),
-                new FateHostPartition(hostAndPort, partition));
+        long sleepTime = Math.min(stableCount * 100, 5_000);
+        Thread.sleep(sleepTime);
+
+        // This map will contain all current workers even if their partitions 
are empty
+        Map<HostAndPort,CurrentPartitions> currentPartitions;
+        try {
+          currentPartitions = getCurrentAssignments(context);
+        } catch (TException e) {
+          log.warn("Failed to get current partitions ", e);
+          continue;
+        }
+        Map<HostAndPort,Set<FatePartition>> currentAssignments = new 
HashMap<>();
+        currentPartitions.forEach((k, v) -> currentAssignments.put(k, 
v.partitions()));
+        Set<FatePartition> desiredParititions = 
getDesiredPartitions(currentAssignments.size());
+
+        Map<HostAndPort,Set<FatePartition>> desired =
+            computeDesiredAssignments(currentAssignments, desiredParititions);
+
+        if (desired.equals(currentAssignments)) {
+          RangeMap<FateId,FateHostPartition> rangeMap = TreeRangeMap.create();
+          currentAssignments.forEach((hostAndPort, partitions) -> {
+            partitions.forEach(partition -> {
+              rangeMap.put(Range.closed(partition.start(), partition.end()),
+                  new FateHostPartition(hostAndPort, partition));
+            });
           });
-        });
-        stableAssignments.set(rangeMap);
-        stableCount++;
-      } else {
-        stableAssignments.set(TreeRangeMap.create());
-        stableCount = 0;
-      }
+          stableAssignments.set(rangeMap);
+          stableCount++;
+        } else {
+          stableAssignments.set(TreeRangeMap.create());
+          stableCount = 0;
+        }
 
-      // are there any workers with extra partitions? If so need to unload 
those first.
-      int unloads = 0;
-      for (Map.Entry<HostAndPort,Set<FatePartition>> entry : 
desired.entrySet()) {
-        HostAndPort worker = entry.getKey();
-        Set<FatePartition> partitions = entry.getValue();
-        var curr = currentAssignments.getOrDefault(worker, Set.of());
-        if (!Sets.difference(curr, partitions).isEmpty()) {
-          // This worker has extra partitions that are not desired
-          var intersection = Sets.intersection(curr, partitions);
-          if (!setPartitions(worker, currentPartitions.get(worker).updateId(), 
intersection)) {
-            log.debug("Failed to set partitions for {} to {}", worker, 
intersection);
-            // could not set, so start completely over
-            continue outer;
-          } else {
-            log.debug("Set partitions for {} to {} from {}", worker, 
intersection, curr);
-            unloads++;
+        // are there any workers with extra partitions? If so need to unload 
those first.
+        int unloads = 0;
+        for (Map.Entry<HostAndPort,Set<FatePartition>> entry : 
desired.entrySet()) {
+          HostAndPort worker = entry.getKey();
+          Set<FatePartition> partitions = entry.getValue();
+          var curr = currentAssignments.getOrDefault(worker, Set.of());
+          if (!Sets.difference(curr, partitions).isEmpty()) {
+            // This worker has extra partitions that are not desired
+            var intersection = Sets.intersection(curr, partitions);
+            if (!setPartitions(worker, 
currentPartitions.get(worker).updateId(), intersection)) {
+              log.debug("Failed to set partitions for {} to {}", worker, 
intersection);
+              // could not set, so start completely over
+              continue outer;
+            } else {
+              log.debug("Set partitions for {} to {} from {}", worker, 
intersection, curr);
+              unloads++;
+            }
           }
         }
-      }
 
-      if (unloads > 0) {
-        // some tablets were unloaded, so start over and get new update ids 
and the current
-        // partitions
-        continue outer;
-      }
+        if (unloads > 0) {
+          // some tablets were unloaded, so start over and get new update ids 
and the current
+          // partitions
+          continue outer;
+        }
 
-      // Load all partitions on all workers..
-      for (Map.Entry<HostAndPort,Set<FatePartition>> entry : 
desired.entrySet()) {
-        HostAndPort worker = entry.getKey();
-        Set<FatePartition> partitions = entry.getValue();
-        var curr = currentAssignments.getOrDefault(worker, Set.of());
-        if (!curr.equals(partitions)) {
-          if (!setPartitions(worker, currentPartitions.get(worker).updateId(), 
partitions)) {
-            log.debug("Failed to set partitions for {} to {}", worker, 
partitions);
-            // could not set, so start completely over
-            continue outer;
-          } else {
-            log.debug("Set partitions for {} to {} from {}", worker, 
partitions, curr);
+        // Load all partitions on all workers..
+        for (Map.Entry<HostAndPort,Set<FatePartition>> entry : 
desired.entrySet()) {
+          HostAndPort worker = entry.getKey();
+          Set<FatePartition> partitions = entry.getValue();
+          var curr = currentAssignments.getOrDefault(worker, Set.of());
+          if (!curr.equals(partitions)) {
+            if (!setPartitions(worker, 
currentPartitions.get(worker).updateId(), partitions)) {
+              log.debug("Failed to set partitions for {} to {}", worker, 
partitions);
+              // could not set, so start completely over
+              continue outer;
+            } else {
+              log.debug("Set partitions for {} to {} from {}", worker, 
partitions, curr);
+            }
           }
         }
+      } catch (Exception e) {
+        log.warn("Failed to assign fate partitions to managers, will retry 
later", e);
       }
     }
   }
@@ -175,13 +178,7 @@ public class FateManager {
     Preconditions.checkState(ntfyThread == null);
     Preconditions.checkState(!stop.get());
 
-    assignmentThread = Threads.createCriticalThread("Fate Manager", () -> {
-      try {
-        manageAssistants();
-      } catch (Exception e) {
-        throw new IllegalStateException(e);
-      }
-    });
+    assignmentThread = Threads.createCriticalThread("Fate Manager", 
this::manageAssistants);
     assignmentThread.start();
 
     ntfyThread = Threads.createCriticalThread("Fate Notify", new NotifyTask());
@@ -322,6 +319,9 @@ public class FateManager {
       log.trace("Setting partitions {} {} {}", address, updateId, desired);
       return client.setPartitions(TraceUtil.traceInfo(), context.rpcCreds(), 
updateId,
           desired.stream().map(FatePartition::toThrift).toList());
+    } catch (TException e) {
+      log.warn("Failed to set partition on {}", address, e);
+      return false;
     } finally {
       ThriftUtil.returnClient(client, context);
     }

Reply via email to