This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new af6a8ba431 Makes FateManager more robust (#6292)
af6a8ba431 is described below
commit af6a8ba431d6876945c98459678cf845dcffd5fb
Author: Keith Turner <[email protected]>
AuthorDate: Thu Apr 2 13:20:36 2026 -0700
Makes FateManager more robust (#6292)
If the FateManager had a connection error talking to another manager it
could kill the primary manager process. Changed the code to tolerate
errors and keep trying.
---
.../apache/accumulo/manager/fate/FateManager.java | 146 ++++++++++-----------
1 file changed, 73 insertions(+), 73 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
index 6b294e0b8a..f5724a0f9d 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
@@ -85,84 +85,87 @@ public class FateManager {
private final Map<HostAndPort,Set<FatePartition>> pendingNotifications = new
HashMap<>();
- private void manageAssistants() throws TException, InterruptedException {
+ private void manageAssistants() {
log.debug("Started Fate Manager");
long stableCount = 0;
outer: while (!stop.get()) {
-
- long sleepTime = Math.min(stableCount * 100, 5_000);
- Thread.sleep(sleepTime);
-
- // This map will contain all current workers even if their partitions
are empty
- Map<HostAndPort,CurrentPartitions> currentPartitions;
try {
- currentPartitions = getCurrentAssignments(context);
- } catch (TException e) {
- log.warn("Failed to get current partitions ", e);
- continue;
- }
- Map<HostAndPort,Set<FatePartition>> currentAssignments = new HashMap<>();
- currentPartitions.forEach((k, v) -> currentAssignments.put(k,
v.partitions()));
- Set<FatePartition> desiredParititions =
getDesiredPartitions(currentAssignments.size());
-
- Map<HostAndPort,Set<FatePartition>> desired =
- computeDesiredAssignments(currentAssignments, desiredParititions);
-
- if (desired.equals(currentAssignments)) {
- RangeMap<FateId,FateHostPartition> rangeMap = TreeRangeMap.create();
- currentAssignments.forEach((hostAndPort, partitions) -> {
- partitions.forEach(partition -> {
- rangeMap.put(Range.closed(partition.start(), partition.end()),
- new FateHostPartition(hostAndPort, partition));
+ long sleepTime = Math.min(stableCount * 100, 5_000);
+ Thread.sleep(sleepTime);
+
+ // This map will contain all current workers even if their partitions
are empty
+ Map<HostAndPort,CurrentPartitions> currentPartitions;
+ try {
+ currentPartitions = getCurrentAssignments(context);
+ } catch (TException e) {
+ log.warn("Failed to get current partitions ", e);
+ continue;
+ }
+ Map<HostAndPort,Set<FatePartition>> currentAssignments = new
HashMap<>();
+ currentPartitions.forEach((k, v) -> currentAssignments.put(k,
v.partitions()));
+ Set<FatePartition> desiredParititions =
getDesiredPartitions(currentAssignments.size());
+
+ Map<HostAndPort,Set<FatePartition>> desired =
+ computeDesiredAssignments(currentAssignments, desiredParititions);
+
+ if (desired.equals(currentAssignments)) {
+ RangeMap<FateId,FateHostPartition> rangeMap = TreeRangeMap.create();
+ currentAssignments.forEach((hostAndPort, partitions) -> {
+ partitions.forEach(partition -> {
+ rangeMap.put(Range.closed(partition.start(), partition.end()),
+ new FateHostPartition(hostAndPort, partition));
+ });
});
- });
- stableAssignments.set(rangeMap);
- stableCount++;
- } else {
- stableAssignments.set(TreeRangeMap.create());
- stableCount = 0;
- }
+ stableAssignments.set(rangeMap);
+ stableCount++;
+ } else {
+ stableAssignments.set(TreeRangeMap.create());
+ stableCount = 0;
+ }
- // are there any workers with extra partitions? If so need to unload
those first.
- int unloads = 0;
- for (Map.Entry<HostAndPort,Set<FatePartition>> entry :
desired.entrySet()) {
- HostAndPort worker = entry.getKey();
- Set<FatePartition> partitions = entry.getValue();
- var curr = currentAssignments.getOrDefault(worker, Set.of());
- if (!Sets.difference(curr, partitions).isEmpty()) {
- // This worker has extra partitions that are not desired
- var intersection = Sets.intersection(curr, partitions);
- if (!setPartitions(worker, currentPartitions.get(worker).updateId(),
intersection)) {
- log.debug("Failed to set partitions for {} to {}", worker,
intersection);
- // could not set, so start completely over
- continue outer;
- } else {
- log.debug("Set partitions for {} to {} from {}", worker,
intersection, curr);
- unloads++;
+ // are there any workers with extra partitions? If so need to unload
those first.
+ int unloads = 0;
+ for (Map.Entry<HostAndPort,Set<FatePartition>> entry :
desired.entrySet()) {
+ HostAndPort worker = entry.getKey();
+ Set<FatePartition> partitions = entry.getValue();
+ var curr = currentAssignments.getOrDefault(worker, Set.of());
+ if (!Sets.difference(curr, partitions).isEmpty()) {
+ // This worker has extra partitions that are not desired
+ var intersection = Sets.intersection(curr, partitions);
+ if (!setPartitions(worker,
currentPartitions.get(worker).updateId(), intersection)) {
+ log.debug("Failed to set partitions for {} to {}", worker,
intersection);
+ // could not set, so start completely over
+ continue outer;
+ } else {
+ log.debug("Set partitions for {} to {} from {}", worker,
intersection, curr);
+ unloads++;
+ }
}
}
- }
- if (unloads > 0) {
- // some tablets were unloaded, so start over and get new update ids
and the current
- // partitions
- continue outer;
- }
+ if (unloads > 0) {
+ // some tablets were unloaded, so start over and get new update ids
and the current
+ // partitions
+ continue outer;
+ }
- // Load all partitions on all workers..
- for (Map.Entry<HostAndPort,Set<FatePartition>> entry :
desired.entrySet()) {
- HostAndPort worker = entry.getKey();
- Set<FatePartition> partitions = entry.getValue();
- var curr = currentAssignments.getOrDefault(worker, Set.of());
- if (!curr.equals(partitions)) {
- if (!setPartitions(worker, currentPartitions.get(worker).updateId(),
partitions)) {
- log.debug("Failed to set partitions for {} to {}", worker,
partitions);
- // could not set, so start completely over
- continue outer;
- } else {
- log.debug("Set partitions for {} to {} from {}", worker,
partitions, curr);
+ // Load all partitions on all workers..
+ for (Map.Entry<HostAndPort,Set<FatePartition>> entry :
desired.entrySet()) {
+ HostAndPort worker = entry.getKey();
+ Set<FatePartition> partitions = entry.getValue();
+ var curr = currentAssignments.getOrDefault(worker, Set.of());
+ if (!curr.equals(partitions)) {
+ if (!setPartitions(worker,
currentPartitions.get(worker).updateId(), partitions)) {
+ log.debug("Failed to set partitions for {} to {}", worker,
partitions);
+ // could not set, so start completely over
+ continue outer;
+ } else {
+ log.debug("Set partitions for {} to {} from {}", worker,
partitions, curr);
+ }
}
}
+ } catch (Exception e) {
+ log.warn("Failed to assign fate partitions to managers, will retry
later", e);
}
}
}
@@ -175,13 +178,7 @@ public class FateManager {
Preconditions.checkState(ntfyThread == null);
Preconditions.checkState(!stop.get());
- assignmentThread = Threads.createCriticalThread("Fate Manager", () -> {
- try {
- manageAssistants();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- });
+ assignmentThread = Threads.createCriticalThread("Fate Manager",
this::manageAssistants);
assignmentThread.start();
ntfyThread = Threads.createCriticalThread("Fate Notify", new NotifyTask());
@@ -322,6 +319,9 @@ public class FateManager {
log.trace("Setting partitions {} {} {}", address, updateId, desired);
return client.setPartitions(TraceUtil.traceInfo(), context.rpcCreds(),
updateId,
desired.stream().map(FatePartition::toThrift).toList());
+ } catch (TException e) {
+ log.warn("Failed to set partition on {}", address, e);
+ return false;
} finally {
ThriftUtil.returnClient(client, context);
}