This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 263f5c88df Added Multiple Manager support to Mini Accumulo (#6208)
263f5c88df is described below

commit 263f5c88df701513a50638e26dd3885fab684832
Author: Dave Marion <[email protected]>
AuthorDate: Thu Mar 12 13:44:40 2026 -0400

    Added Multiple Manager support to Mini Accumulo (#6208)
---
 .../ClusterServerConfiguration.java                |  9 +++
 .../MiniAccumuloClusterControl.java                | 74 +++++++++++++---------
 .../miniclusterImpl/MiniAccumuloClusterImpl.java   | 14 ++--
 .../org/apache/accumulo/server/util/ZooZap.java    |  9 ++-
 .../accumulo/test/ComprehensiveMultiManagerIT.java |  6 +-
 .../apache/accumulo/test/MultipleManagerIT.java    | 33 +++++++---
 6 files changed, 92 insertions(+), 53 deletions(-)

diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
index e37ad98053..ab66268dd1 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.Constants;
 
 public class ClusterServerConfiguration {
 
+  private int managers = 1;
   private final Map<String,Integer> compactors;
   private final Map<String,Integer> sservers;
   private final Map<String,Integer> tservers;
@@ -55,6 +56,14 @@ public class ClusterServerConfiguration {
     tservers.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numTServers);
   }
 
+  public void setNumManagers(int mgrs) {
+    managers = mgrs;
+  }
+
+  public int getNumManagers() {
+    return managers;
+  }
+
   public void setNumDefaultCompactors(int numCompactors) {
     compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors);
   }
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index 9395dcd638..b075dba7ae 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -58,7 +58,7 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
   protected MiniAccumuloClusterImpl cluster;
 
   Process zooKeeperProcess = null;
-  Process managerProcess = null;
+  final List<Process> managerProcesses = new ArrayList<>();
   Process gcProcess = null;
   Process monitor = null;
   final Map<String,List<Process>> tabletServerProcesses = new HashMap<>();
@@ -171,10 +171,14 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
         }
         break;
       case MANAGER:
-        if (managerProcess == null) {
+        synchronized (managerProcesses) {
+          int numMgrs = 
cluster.getConfig().getClusterServerConfiguration().getNumManagers();
           Class<?> classToUse = classOverride != null ? classOverride
               : cluster.getConfig().getServerClass(server, 
Constants.DEFAULT_RESOURCE_GROUP_NAME);
-          managerProcess = cluster._exec(classToUse, server, configOverrides, 
args).getProcess();
+          for (int i = managerProcesses.size(); i < numMgrs; i++) {
+            managerProcesses
+                .add(cluster._exec(classToUse, server, configOverrides, 
args).getProcess());
+          }
         }
         break;
       case ZOOKEEPER:
@@ -308,20 +312,17 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
   public synchronized void stop(ServerType server, String hostname) throws 
IOException {
     switch (server) {
       case MANAGER:
-        if (managerProcess != null) {
+        synchronized (managerProcesses) {
           try {
-            cluster.stopProcessWithTimeout(managerProcess, 30, 
TimeUnit.SECONDS);
+            cluster.stopProcessesWithTimeout(ServerType.MANAGER, 
managerProcesses, 30,
+                TimeUnit.SECONDS);
             try {
               new ZooZap().execute(new String[] {"-manager"});
             } catch (Exception e) {
               log.error("Error zapping Manager zookeeper lock", e);
             }
-          } catch (ExecutionException | TimeoutException e) {
-            log.warn("Manager did not fully stop after 30 seconds", e);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
           } finally {
-            managerProcess = null;
+            managerProcesses.clear();
           }
         }
         break;
@@ -423,14 +424,21 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
     boolean found = false;
     switch (type) {
       case MANAGER:
-        if (procRef.getProcess().equals(managerProcess)) {
-          try {
-            cluster.stopProcessWithTimeout(managerProcess, 30, 
TimeUnit.SECONDS);
-          } catch (ExecutionException | TimeoutException e) {
-            log.warn("Manager did not fully stop after 30 seconds", e);
+        synchronized (managerProcesses) {
+          Iterator<Process> iter = managerProcesses.iterator();
+          while (!found && iter.hasNext()) {
+            Process process = iter.next();
+            if (procRef.getProcess().equals(process)) {
+              iter.remove();
+              try {
+                cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS);
+              } catch (ExecutionException | TimeoutException e) {
+                log.warn("Manager did not fully stop after 30 seconds", e);
+              }
+              found = true;
+              break;
+            }
           }
-          managerProcess = null;
-          found = true;
         }
         break;
       case TABLET_SERVER:
@@ -549,9 +557,7 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
         }
         break;
       case MANAGER:
-        if (!managerProcess.isAlive()) {
-          managerProcess = null;
-        }
+        managerProcesses.removeIf(process -> !process.isAlive());
         break;
       case MONITOR:
         if (!monitor.isAlive()) {
@@ -577,23 +583,31 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
   public Set<Process> getProcesses(ServerType type) {
     switch (type) {
       case COMPACTOR:
-        Set<Process> cprocesses = new HashSet<>();
-        compactorProcesses.values().forEach(list -> 
list.forEach(cprocesses::add));
-        return cprocesses;
+        synchronized (compactorProcesses) {
+          Set<Process> cprocesses = new HashSet<>();
+          compactorProcesses.values().forEach(list -> 
list.forEach(cprocesses::add));
+          return cprocesses;
+        }
       case GARBAGE_COLLECTOR:
         return gcProcess == null ? Set.of() : Set.of(gcProcess);
       case MANAGER:
-        return managerProcess == null ? Set.of() : Set.of(managerProcess);
+        synchronized (managerProcesses) {
+          return managerProcesses.size() == 0 ? Set.of() : 
Set.copyOf(managerProcesses);
+        }
       case MONITOR:
         return monitor == null ? Set.of() : Set.of(monitor);
       case SCAN_SERVER:
-        Set<Process> sprocesses = new HashSet<>();
-        scanServerProcesses.values().forEach(list -> 
list.forEach(sprocesses::add));
-        return sprocesses;
+        synchronized (scanServerProcesses) {
+          Set<Process> sprocesses = new HashSet<>();
+          scanServerProcesses.values().forEach(list -> 
list.forEach(sprocesses::add));
+          return sprocesses;
+        }
       case TABLET_SERVER:
-        Set<Process> tprocesses = new HashSet<>();
-        tabletServerProcesses.values().forEach(list -> 
list.forEach(tprocesses::add));
-        return tprocesses;
+        synchronized (tabletServerProcesses) {
+          Set<Process> tprocesses = new HashSet<>();
+          tabletServerProcesses.values().forEach(list -> 
list.forEach(tprocesses::add));
+          return tprocesses;
+        }
       case ZOOKEEPER:
         return zooKeeperProcess == null ? Set.of() : Set.of(zooKeeperProcess);
       default:
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 0e178c0156..3d9103985e 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -827,8 +827,12 @@ public class MiniAccumuloClusterImpl implements 
AccumuloCluster {
   private void verifyUp(ClientContext context, InstanceId instanceId)
       throws InterruptedException, IOException {
 
-    requireNonNull(getClusterControl().managerProcess, "Error starting Manager 
- no process");
-    waitForProcessStart(getClusterControl().managerProcess, "Manager");
+    int mgrExpectedCount = 0;
+    for (Process tsp : getClusterControl().managerProcesses) {
+      mgrExpectedCount++;
+      requireNonNull(tsp, "Error starting Manager " + mgrExpectedCount + " - 
no process");
+      waitForProcessStart(tsp, "Manager" + mgrExpectedCount);
+    }
 
     requireNonNull(getClusterControl().gcProcess, "Error starting GC - no 
process");
     waitForProcessStart(getClusterControl().gcProcess, "GC");
@@ -916,7 +920,6 @@ public class MiniAccumuloClusterImpl implements 
AccumuloCluster {
     return Stream.of(procs).map(ProcessReference::new).collect(toList());
   }
 
-  @SuppressWarnings("removal")
   public Map<ServerType,Collection<ProcessReference>> getProcesses() {
     Map<ServerType,Collection<ProcessReference>> result = new HashMap<>();
     MiniAccumuloClusterControl control = getClusterControl();
@@ -933,9 +936,8 @@ public class MiniAccumuloClusterImpl implements 
AccumuloCluster {
           }
           break;
         case MANAGER:
-          if (control.managerProcess != null) {
-            result.put(type, references(control.managerProcess));
-          }
+          result.put(type, 
references(control.managerProcesses.stream().collect(Collectors.toList())
+              .toArray(new Process[0])));
           break;
         case MONITOR:
           if (control.monitor != null) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java 
b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
index cb64c5017e..d50c817811 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@ -141,9 +141,12 @@ public class ZooZap extends 
ServerKeywordExecutable<ZapOpts> {
     var zrw = context.getZooSession().asReaderWriter();
     if (opts.zapManager) {
       try {
-        ServiceLockPath managerLockPath = 
context.getServerPaths().createManagerPath();
-        filterSingleton(context, managerLockPath, addressSelector)
-            .ifPresent(slp -> removeSingletonLock(zrw, slp, opts));
+        Set<ServiceLockPath> managerPaths =
+            context.getServerPaths().getAssistantManagers(addressSelector, 
false);
+        for (var serverLockPath : managerPaths) {
+          message("Deleting manager " + serverLockPath.getServer() + " from 
zookeeper", opts);
+          zrw.recursiveDelete(serverLockPath.toString(), 
NodeMissingPolicy.SKIP);
+        }
       } catch (RuntimeException e) {
         log.error("Error deleting manager lock", e);
       }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveMultiManagerIT.java 
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveMultiManagerIT.java
index 1c5f8d10a4..fc0d87369e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveMultiManagerIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveMultiManagerIT.java
@@ -25,7 +25,6 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
-import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.fate.FateManager;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.util.Wait;
@@ -45,6 +44,7 @@ public class ComprehensiveMultiManagerIT extends 
ComprehensiveITBase {
     public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
         org.apache.hadoop.conf.Configuration coreSite) {
       cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "5s");
+      cfg.getClusterServerConfiguration().setNumManagers(3);
     }
   }
 
@@ -56,10 +56,6 @@ public class ComprehensiveMultiManagerIT extends 
ComprehensiveITBase {
       client.securityOperations().changeUserAuthorizations("root", 
AUTHORIZATIONS);
     }
 
-    // Start two more managers
-    getCluster().exec(Manager.class);
-    getCluster().exec(Manager.class);
-
     // Wait for 3 managers to have a fate partition assigned to them
     var srvCtx = getCluster().getServerContext();
     Wait.waitFor(() -> {
diff --git a/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java 
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
index 8430ab527b..e51b13c330 100644
--- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
@@ -26,7 +26,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
@@ -58,6 +58,7 @@ import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.FateEnv;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.miniclusterImpl.ProcessReference;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.fate.FastFate;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
@@ -117,7 +118,6 @@ public class MultipleManagerIT extends ConfigurableMacBase {
   @Test
   public void testFate() throws Exception {
 
-    List<Process> managerWorkers = new ArrayList<>();
     var executor = Executors.newCachedThreadPool();
 
     // Start a lot of background threads that should cause fate operations to 
run.
@@ -125,9 +125,14 @@ public class MultipleManagerIT extends ConfigurableMacBase 
{
       // Create a table in order to wait for the single manager to become the 
primary manager
       client.tableOperations().create("waitTable");
 
+      // Store primary manager ref
+      var managerRefs = getCluster().getProcesses().get(ServerType.MANAGER);
+      assertEquals(1, managerRefs.size());
+      final var primaryManagerRef = managerRefs.iterator().next();
+
       // start more manager processes, should be assigned fate work
-      managerWorkers.add(exec(FastFateCleanupManager.class));
-      managerWorkers.add(exec(FastFateCleanupManager.class));
+      
getCluster().getConfig().getClusterServerConfiguration().setNumManagers(3);
+      getCluster().getClusterControl().start(ServerType.MANAGER);
 
       AtomicBoolean stop = new AtomicBoolean(false);
 
@@ -197,14 +202,25 @@ public class MultipleManagerIT extends 
ConfigurableMacBase {
       waitToSeeManagers(ctx, 3, store, false);
 
       // Start two new manager processes and wait until 5 managers are seen 
running fate operations
-      managerWorkers.add(exec(FastFateCleanupManager.class));
-      managerWorkers.add(exec(FastFateCleanupManager.class));
+      
getCluster().getConfig().getClusterServerConfiguration().setNumManagers(5);
+      getCluster().getClusterControl().start(ServerType.MANAGER);
       waitToSeeManagers(ctx, 5, store, false);
 
       // Kill two assistant manager processes. Any fate operations that were 
running should resume
       // elsewhere. Should see three manager running operations after that.
-      managerWorkers.get(2).destroy();
-      managerWorkers.get(3).destroy();
+      managerRefs = getCluster().getProcesses().get(ServerType.MANAGER);
+      assertEquals(5, managerRefs.size());
+      Iterator<ProcessReference> iter = managerRefs.iterator();
+      int killed = 0;
+      while (iter.hasNext() && killed < 2) {
+        var candidate = iter.next();
+        if (candidate.equals(primaryManagerRef)) {
+          continue;
+        }
+        getCluster().getClusterControl().killProcess(ServerType.MANAGER, 
candidate);
+        killed++;
+      }
+
       log.debug("Killed 2 managers");
       waitToSeeManagers(ctx, 3, store, true);
 
@@ -228,7 +244,6 @@ public class MultipleManagerIT extends ConfigurableMacBase {
 
     executor.shutdown();
 
-    managerWorkers.forEach(Process::destroy);
   }
 
   private static void waitToSeeManagers(ClientContext context, int 
expectedManagers,

Reply via email to