This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new c01b34949d Added minimum Manager property (#6209)
c01b34949d is described below
commit c01b34949d9c2d9a4eddd33ce8bd62b22a0fbd79
Author: Dave Marion <[email protected]>
AuthorDate: Thu Mar 12 14:41:20 2026 -0400
Added minimum Manager property (#6209)
Added properties and code to the Manager to wait for
a minimum number of Managers to start and get the
assistant manager lock before continuing and acquiring
the primary lock.
Closes #6186
---
.../org/apache/accumulo/core/conf/Property.java | 13 +++
.../java/org/apache/accumulo/manager/Manager.java | 75 +++++++++------
...leManagerIT.java => MultipleManagerFateIT.java} | 2 +-
.../test/MultipleManagerStartupWaitIT.java | 107 +++++++++++++++++++++
4 files changed, 169 insertions(+), 28 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 99040f1f5d..19c2e55be3 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -531,6 +531,19 @@ public enum Property {
"Allow tablets for the " + SystemTables.METADATA.tableName()
+ " table to be suspended via table.suspend.duration.",
"1.8.0"),
+
MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT("manager.startup.manager.avail.min.count",
"0",
+ PropertyType.COUNT,
+ "Minimum number of managers that need to be registered before the
primary manager will start. A value "
+ + "greater than 0 is useful when multiple managers are supposed to
be running on startup. "
+ + "When set to 0 or less, no blocking occurs. Default is 0
(disabled).",
+ "4.0.0"),
+
MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT("manager.startup.manager.avail.max.wait",
"0",
+ PropertyType.TIMEDURATION,
+ "Maximum time manager will wait for manager available threshold "
+ + "to be reached before continuing. When set to 0 or less, will
block "
+ + "indefinitely. Default is 0 to block indefinitely. Only valid when
manager available "
+ + "threshold is set greater than 1.",
+ "4.0.0"),
MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT("manager.startup.tserver.avail.min.count",
"0",
PropertyType.COUNT, """
Minimum number of tservers that need to be registered before manager
will \
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index c9a7df3a3d..5ff232765c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.ServerOpts;
@@ -1019,6 +1020,14 @@ public class Manager extends AbstractServer
setupAssistantMetrics(fateWorker.getMetricsProducers());
+ // wait a configurable amount of time for multiple manager processes to
start, then
+ // try to get the primary manager lock.
+ try {
+ blockForManagers();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+
// block until we can obtain the ZK lock for the manager. Create the
// initial lock using ThriftService.NONE. This will allow the lock
// allocation to occur, but prevent any services from getting the
@@ -1375,6 +1384,18 @@ public class Manager extends AbstractServer
}
}
+ private void blockForTservers() throws InterruptedException {
+ blockForServers(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT,
+ Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, () ->
tserverSet.size(), "tserver");
+ }
+
+ private void blockForManagers() throws InterruptedException {
+ blockForServers(Property.MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT,
+ Property.MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT, () ->
getContext().getServerPaths()
+ .getAssistantManagers(AddressSelector.all(), true).size(),
+ "manager");
+ }
+
/**
* Allows property configuration to block manager start-up waiting for a
minimum number of
* tservers to register in zookeeper. It also accepts a maximum time to wait
- if the time
@@ -1390,19 +1411,19 @@ public class Manager extends AbstractServer
*
* @throws InterruptedException if interrupted while blocking, propagated
for caller to handle.
*/
- private void blockForTservers() throws InterruptedException {
+ private void blockForServers(Property minServerCountProperty, Property
minServerWaitProperty,
+ Supplier<Integer> numServers, String serverType) throws
InterruptedException {
Timer waitTimer = Timer.startNew();
- long minTserverCount =
-
getConfiguration().getCount(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT);
+ long minServerCount = getConfiguration().getCount(minServerCountProperty);
- if (minTserverCount <= 0) {
- log.info("tserver availability check disabled, continuing with-{}
servers. To enable, set {}",
- tserverSet.size(),
Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT.getKey());
+ if (minServerCount <= 0) {
+ log.info("{} availability check disabled, continuing with-{} servers. To
enable, set {}",
+ serverType, numServers.get(), minServerCountProperty.getKey());
return;
}
- long userWait = MILLISECONDS.toSeconds(
-
getConfiguration().getTimeInMillis(Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT));
+ long userWait =
+
MILLISECONDS.toSeconds(getConfiguration().getTimeInMillis(minServerWaitProperty));
// Setting retry values for defined wait timeouts
long retries = 10;
@@ -1412,8 +1433,8 @@ public class Manager extends AbstractServer
long waitIncrement = 0;
if (userWait <= 0) {
- log.info("tserver availability check set to block indefinitely, To
change, set {} > 0.",
- Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT.getKey());
+ log.info("{} availability check set to block indefinitely, To change,
set {} > 0.",
+ serverType, minServerWaitProperty.getKey());
userWait = Long.MAX_VALUE;
// If indefinitely blocking, change retry values to support incremental
backoff and logging.
@@ -1423,42 +1444,42 @@ public class Manager extends AbstractServer
waitIncrement = 5;
}
- Retry tserverRetry = Retry.builder().maxRetries(retries)
+ Retry serverRetry = Retry.builder().maxRetries(retries)
.retryAfter(Duration.ofSeconds(initialWait)).incrementBy(Duration.ofSeconds(waitIncrement))
.maxWait(Duration.ofSeconds(maxWaitPeriod)).backOffFactor(1)
.logInterval(Duration.ofSeconds(30)).createRetry();
- log.info("Checking for tserver availability - need to reach {} servers.
Have {}",
- minTserverCount, tserverSet.size());
+ log.info("Checking for {} availability - need to reach {} servers. Have
{}", serverType,
+ minServerCount, numServers.get());
- boolean needTservers = tserverSet.size() < minTserverCount;
+ boolean needServers = numServers.get() < minServerCount;
- while (needTservers && tserverRetry.canRetry()) {
+ while (needServers && serverRetry.canRetry()) {
- tserverRetry.waitForNextAttempt(log, "block until minimum tservers
reached");
+ serverRetry.waitForNextAttempt(log, "block until minimum " + serverType
+ " reached");
- needTservers = tserverSet.size() < minTserverCount;
+ needServers = numServers.get() < minServerCount;
// suppress last message once threshold reached.
- if (needTservers) {
- tserverRetry.logRetry(log, String.format(
- "Blocking for tserver availability - need to reach %s servers.
Have %s Time spent blocking %s seconds.",
- minTserverCount, tserverSet.size(), waitTimer.elapsed(SECONDS)));
+ if (needServers) {
+ serverRetry.logRetry(log, String.format(
+ "Blocking for %s availability - need to reach %s servers. Have %s
Time spent blocking %s seconds.",
+ serverType, minServerCount, numServers.get(),
waitTimer.elapsed(SECONDS)));
}
- tserverRetry.useRetry();
+ serverRetry.useRetry();
}
- if (tserverSet.size() < minTserverCount) {
+ if (numServers.get() < minServerCount) {
log.warn(
- "tserver availability check time expired - continuing. Requested {},
have {} tservers on line. "
+ "{} availability check time expired - continuing. Requested {}, have
{} tservers on line. "
+ " Time waiting {} sec",
- tserverSet.size(), minTserverCount, waitTimer.elapsed(SECONDS));
+ serverType, numServers.get(), minServerCount,
waitTimer.elapsed(SECONDS));
} else {
log.info(
- "tserver availability check completed. Requested {}, have {}
tservers on line. "
+ "{} availability check completed. Requested {}, have {} tservers on
line. "
+ " Time waiting {} sec",
- tserverSet.size(), minTserverCount, waitTimer.elapsed(SECONDS));
+ serverType, numServers.get(), minServerCount,
waitTimer.elapsed(SECONDS));
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
similarity index 99%
rename from test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
rename to test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
index e51b13c330..d0d8bdb5ed 100644
--- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
@@ -83,7 +83,7 @@ import com.google.common.net.HostAndPort;
* </ul>
*
*/
-public class MultipleManagerIT extends ConfigurableMacBase {
+public class MultipleManagerFateIT extends ConfigurableMacBase {
// A manager that will quickly clean up fate reservations held by dead
managers
public static class FastFateCleanupManager extends Manager {
diff --git
a/test/src/main/java/org/apache/accumulo/test/MultipleManagerStartupWaitIT.java
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerStartupWaitIT.java
new file mode 100644
index 0000000000..325ef97235
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerStartupWaitIT.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ResourceGroupId;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
+import org.junit.jupiter.api.Test;
+
+public class MultipleManagerStartupWaitIT extends ConfigurableMacBase {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ // Set this lower so that locks timeout faster
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT, "2");
+ cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT, "10s");
+ super.configure(cfg, hadoopCoreSite);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ // Overriding setup here so that the cluster
+ // is not started. We are going to start in
+ // manually in the test method
+ }
+
+ private List<String> getAssistantManagers() throws KeeperException,
InterruptedException {
+ String zAsstMgrPath = Constants.ZMANAGER_ASSISTANT_LOCK + "/" +
ResourceGroupId.DEFAULT;
+ return
getCluster().getServerContext().getZooSession().getChildren(zAsstMgrPath, null);
+ }
+
+ @Test
+ public void testManagerWait() throws Exception {
+
+ AtomicReference<Throwable> startError = new AtomicReference<>();
+ Thread clusterThread = new Thread(() -> {
+ try {
+ super.setUp();
+ } catch (Exception e) {
+ startError.set(e);
+ }
+ });
+ clusterThread.start();
+
+ // Wait a few seconds for processes to start and
+ // for ServerContext to be created
+ Thread.sleep(10_000);
+
+ // One Manager should be up and have acquired the assistant manager lock
+ Wait.waitFor(() -> getAssistantManagers().size() == 1);
+
+ // The Primary Manager lock should not be acquired yet
+
assertNull(getCluster().getServerContext().getServerPaths().getManager(true));
+
+ // Start the 2nd Manager
+ getCluster().getConfig().getClusterServerConfiguration().setNumManagers(2);
+ getCluster().start();
+
+ // Wait for both Managers to acquire the assistant manager locks
+ Wait.waitFor(() -> getAssistantManagers().size() == 2);
+
+ // The Primary Manager lock should now be acquired yet
+
assertNotNull(getCluster().getServerContext().getServerPaths().getManager(true));
+
+ List<String> managers = getAssistantManagers();
+ assertEquals(2, managers.size());
+
+ Set<ServerId> primary =
+
getCluster().getServerContext().instanceOperations().getServers(ServerId.Type.MANAGER);
+ assertNotNull(primary);
+ assertEquals(1, primary.size());
+
assertTrue(managers.contains(primary.iterator().next().toHostPortString()));
+ assertNull(startError.get());
+ }
+}