This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 263f5c88df Added Multiple Manager support to Mini Accumulo (#6208)
263f5c88df is described below
commit 263f5c88df701513a50638e26dd3885fab684832
Author: Dave Marion <[email protected]>
AuthorDate: Thu Mar 12 13:44:40 2026 -0400
Added Multiple Manager support to Mini Accumulo (#6208)
---
.../ClusterServerConfiguration.java | 9 +++
.../MiniAccumuloClusterControl.java | 74 +++++++++++++---------
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 14 ++--
.../org/apache/accumulo/server/util/ZooZap.java | 9 ++-
.../accumulo/test/ComprehensiveMultiManagerIT.java | 6 +-
.../apache/accumulo/test/MultipleManagerIT.java | 33 +++++++---
6 files changed, 92 insertions(+), 53 deletions(-)
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
index e37ad98053..ab66268dd1 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/ClusterServerConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.Constants;
public class ClusterServerConfiguration {
+ private int managers = 1;
private final Map<String,Integer> compactors;
private final Map<String,Integer> sservers;
private final Map<String,Integer> tservers;
@@ -55,6 +56,14 @@ public class ClusterServerConfiguration {
tservers.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numTServers);
}
+ public void setNumManagers(int mgrs) {
+ managers = mgrs;
+ }
+
+ public int getNumManagers() {
+ return managers;
+ }
+
public void setNumDefaultCompactors(int numCompactors) {
compactors.put(Constants.DEFAULT_RESOURCE_GROUP_NAME, numCompactors);
}
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index 9395dcd638..b075dba7ae 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -58,7 +58,7 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
protected MiniAccumuloClusterImpl cluster;
Process zooKeeperProcess = null;
- Process managerProcess = null;
+ final List<Process> managerProcesses = new ArrayList<>();
Process gcProcess = null;
Process monitor = null;
final Map<String,List<Process>> tabletServerProcesses = new HashMap<>();
@@ -171,10 +171,14 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
}
break;
case MANAGER:
- if (managerProcess == null) {
+ synchronized (managerProcesses) {
+ int numMgrs =
cluster.getConfig().getClusterServerConfiguration().getNumManagers();
Class<?> classToUse = classOverride != null ? classOverride
: cluster.getConfig().getServerClass(server,
Constants.DEFAULT_RESOURCE_GROUP_NAME);
- managerProcess = cluster._exec(classToUse, server, configOverrides,
args).getProcess();
+ for (int i = managerProcesses.size(); i < numMgrs; i++) {
+ managerProcesses
+ .add(cluster._exec(classToUse, server, configOverrides,
args).getProcess());
+ }
}
break;
case ZOOKEEPER:
@@ -308,20 +312,17 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
public synchronized void stop(ServerType server, String hostname) throws
IOException {
switch (server) {
case MANAGER:
- if (managerProcess != null) {
+ synchronized (managerProcesses) {
try {
- cluster.stopProcessWithTimeout(managerProcess, 30,
TimeUnit.SECONDS);
+ cluster.stopProcessesWithTimeout(ServerType.MANAGER,
managerProcesses, 30,
+ TimeUnit.SECONDS);
try {
new ZooZap().execute(new String[] {"-manager"});
} catch (Exception e) {
log.error("Error zapping Manager zookeeper lock", e);
}
- } catch (ExecutionException | TimeoutException e) {
- log.warn("Manager did not fully stop after 30 seconds", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
} finally {
- managerProcess = null;
+ managerProcesses.clear();
}
}
break;
@@ -423,14 +424,21 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
boolean found = false;
switch (type) {
case MANAGER:
- if (procRef.getProcess().equals(managerProcess)) {
- try {
- cluster.stopProcessWithTimeout(managerProcess, 30,
TimeUnit.SECONDS);
- } catch (ExecutionException | TimeoutException e) {
- log.warn("Manager did not fully stop after 30 seconds", e);
+ synchronized (managerProcesses) {
+ Iterator<Process> iter = managerProcesses.iterator();
+ while (!found && iter.hasNext()) {
+ Process process = iter.next();
+ if (procRef.getProcess().equals(process)) {
+ iter.remove();
+ try {
+ cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS);
+ } catch (ExecutionException | TimeoutException e) {
+ log.warn("Manager did not fully stop after 30 seconds", e);
+ }
+ found = true;
+ break;
+ }
}
- managerProcess = null;
- found = true;
}
break;
case TABLET_SERVER:
@@ -549,9 +557,7 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
}
break;
case MANAGER:
- if (!managerProcess.isAlive()) {
- managerProcess = null;
- }
+ managerProcesses.removeIf(process -> !process.isAlive());
break;
case MONITOR:
if (!monitor.isAlive()) {
@@ -577,23 +583,31 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
public Set<Process> getProcesses(ServerType type) {
switch (type) {
case COMPACTOR:
- Set<Process> cprocesses = new HashSet<>();
- compactorProcesses.values().forEach(list ->
list.forEach(cprocesses::add));
- return cprocesses;
+ synchronized (compactorProcesses) {
+ Set<Process> cprocesses = new HashSet<>();
+ compactorProcesses.values().forEach(list ->
list.forEach(cprocesses::add));
+ return cprocesses;
+ }
case GARBAGE_COLLECTOR:
return gcProcess == null ? Set.of() : Set.of(gcProcess);
case MANAGER:
- return managerProcess == null ? Set.of() : Set.of(managerProcess);
+ synchronized (managerProcesses) {
+ return managerProcesses.size() == 0 ? Set.of() :
Set.copyOf(managerProcesses);
+ }
case MONITOR:
return monitor == null ? Set.of() : Set.of(monitor);
case SCAN_SERVER:
- Set<Process> sprocesses = new HashSet<>();
- scanServerProcesses.values().forEach(list ->
list.forEach(sprocesses::add));
- return sprocesses;
+ synchronized (scanServerProcesses) {
+ Set<Process> sprocesses = new HashSet<>();
+ scanServerProcesses.values().forEach(list ->
list.forEach(sprocesses::add));
+ return sprocesses;
+ }
case TABLET_SERVER:
- Set<Process> tprocesses = new HashSet<>();
- tabletServerProcesses.values().forEach(list ->
list.forEach(tprocesses::add));
- return tprocesses;
+ synchronized (tabletServerProcesses) {
+ Set<Process> tprocesses = new HashSet<>();
+ tabletServerProcesses.values().forEach(list ->
list.forEach(tprocesses::add));
+ return tprocesses;
+ }
case ZOOKEEPER:
return zooKeeperProcess == null ? Set.of() : Set.of(zooKeeperProcess);
default:
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 0e178c0156..3d9103985e 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -827,8 +827,12 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
private void verifyUp(ClientContext context, InstanceId instanceId)
throws InterruptedException, IOException {
- requireNonNull(getClusterControl().managerProcess, "Error starting Manager
- no process");
- waitForProcessStart(getClusterControl().managerProcess, "Manager");
+ int mgrExpectedCount = 0;
+ for (Process tsp : getClusterControl().managerProcesses) {
+ mgrExpectedCount++;
+ requireNonNull(tsp, "Error starting Manager " + mgrExpectedCount + " -
no process");
+ waitForProcessStart(tsp, "Manager" + mgrExpectedCount);
+ }
requireNonNull(getClusterControl().gcProcess, "Error starting GC - no
process");
waitForProcessStart(getClusterControl().gcProcess, "GC");
@@ -916,7 +920,6 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
return Stream.of(procs).map(ProcessReference::new).collect(toList());
}
- @SuppressWarnings("removal")
public Map<ServerType,Collection<ProcessReference>> getProcesses() {
Map<ServerType,Collection<ProcessReference>> result = new HashMap<>();
MiniAccumuloClusterControl control = getClusterControl();
@@ -933,9 +936,8 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
}
break;
case MANAGER:
- if (control.managerProcess != null) {
- result.put(type, references(control.managerProcess));
- }
+ result.put(type,
references(control.managerProcesses.stream().collect(Collectors.toList())
+ .toArray(new Process[0])));
break;
case MONITOR:
if (control.monitor != null) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
index cb64c5017e..d50c817811 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@ -141,9 +141,12 @@ public class ZooZap extends
ServerKeywordExecutable<ZapOpts> {
var zrw = context.getZooSession().asReaderWriter();
if (opts.zapManager) {
try {
- ServiceLockPath managerLockPath =
context.getServerPaths().createManagerPath();
- filterSingleton(context, managerLockPath, addressSelector)
- .ifPresent(slp -> removeSingletonLock(zrw, slp, opts));
+ Set<ServiceLockPath> managerPaths =
+ context.getServerPaths().getAssistantManagers(addressSelector,
false);
+ for (var serverLockPath : managerPaths) {
+ message("Deleting manager " + serverLockPath.getServer() + " from
zookeeper", opts);
+ zrw.recursiveDelete(serverLockPath.toString(),
NodeMissingPolicy.SKIP);
+ }
} catch (RuntimeException e) {
log.error("Error deleting manager lock", e);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveMultiManagerIT.java
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveMultiManagerIT.java
index 1c5f8d10a4..fc0d87369e 100644
---
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveMultiManagerIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveMultiManagerIT.java
@@ -25,7 +25,6 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
-import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.fate.FateManager;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.util.Wait;
@@ -45,6 +44,7 @@ public class ComprehensiveMultiManagerIT extends
ComprehensiveITBase {
public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
org.apache.hadoop.conf.Configuration coreSite) {
cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "5s");
+ cfg.getClusterServerConfiguration().setNumManagers(3);
}
}
@@ -56,10 +56,6 @@ public class ComprehensiveMultiManagerIT extends
ComprehensiveITBase {
client.securityOperations().changeUserAuthorizations("root",
AUTHORIZATIONS);
}
- // Start two more managers
- getCluster().exec(Manager.class);
- getCluster().exec(Manager.class);
-
// Wait for 3 managers to have a fate partition assigned to them
var srvCtx = getCluster().getServerContext();
Wait.waitFor(() -> {
diff --git a/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
index 8430ab527b..e51b13c330 100644
--- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
@@ -26,7 +26,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
+import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
@@ -58,6 +58,7 @@ import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FastFate;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
@@ -117,7 +118,6 @@ public class MultipleManagerIT extends ConfigurableMacBase {
@Test
public void testFate() throws Exception {
- List<Process> managerWorkers = new ArrayList<>();
var executor = Executors.newCachedThreadPool();
// Start a lot of background threads that should cause fate operations to
run.
@@ -125,9 +125,14 @@ public class MultipleManagerIT extends ConfigurableMacBase
{
// Create a table in order to wait for the single manager to become the
primary manager
client.tableOperations().create("waitTable");
+ // Store primary manager ref
+ var managerRefs = getCluster().getProcesses().get(ServerType.MANAGER);
+ assertEquals(1, managerRefs.size());
+ final var primaryManagerRef = managerRefs.iterator().next();
+
// start more manager processes, should be assigned fate work
- managerWorkers.add(exec(FastFateCleanupManager.class));
- managerWorkers.add(exec(FastFateCleanupManager.class));
+
getCluster().getConfig().getClusterServerConfiguration().setNumManagers(3);
+ getCluster().getClusterControl().start(ServerType.MANAGER);
AtomicBoolean stop = new AtomicBoolean(false);
@@ -197,14 +202,25 @@ public class MultipleManagerIT extends
ConfigurableMacBase {
waitToSeeManagers(ctx, 3, store, false);
// Start two new manager processes and wait until 5 managers are seen
running fate operations
- managerWorkers.add(exec(FastFateCleanupManager.class));
- managerWorkers.add(exec(FastFateCleanupManager.class));
+
getCluster().getConfig().getClusterServerConfiguration().setNumManagers(5);
+ getCluster().getClusterControl().start(ServerType.MANAGER);
waitToSeeManagers(ctx, 5, store, false);
// Kill two assistant manager processes. Any fate operations that were
running should resume
// elsewhere. Should see three manager running operations after that.
- managerWorkers.get(2).destroy();
- managerWorkers.get(3).destroy();
+ managerRefs = getCluster().getProcesses().get(ServerType.MANAGER);
+ assertEquals(5, managerRefs.size());
+ Iterator<ProcessReference> iter = managerRefs.iterator();
+ int killed = 0;
+ while (iter.hasNext() && killed < 2) {
+ var candidate = iter.next();
+ if (candidate.equals(primaryManagerRef)) {
+ continue;
+ }
+ getCluster().getClusterControl().killProcess(ServerType.MANAGER,
candidate);
+ killed++;
+ }
+
log.debug("Killed 2 managers");
waitToSeeManagers(ctx, 3, store, true);
@@ -228,7 +244,6 @@ public class MultipleManagerIT extends ConfigurableMacBase {
executor.shutdown();
- managerWorkers.forEach(Process::destroy);
}
private static void waitToSeeManagers(ClientContext context, int
expectedManagers,