This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new c01b34949d Added minimum Manager property (#6209)
c01b34949d is described below

commit c01b34949d9c2d9a4eddd33ce8bd62b22a0fbd79
Author: Dave Marion <[email protected]>
AuthorDate: Thu Mar 12 14:41:20 2026 -0400

    Added minimum Manager property (#6209)
    
    Added properties and code to the Manager to wait for
    a minimum number of Managers to start and get the
    assistant manager lock before continuing and acquiring
    the primary lock.
    
    Closes #6186
---
 .../org/apache/accumulo/core/conf/Property.java    |  13 +++
 .../java/org/apache/accumulo/manager/Manager.java  |  75 +++++++++------
 ...leManagerIT.java => MultipleManagerFateIT.java} |   2 +-
 .../test/MultipleManagerStartupWaitIT.java         | 107 +++++++++++++++++++++
 4 files changed, 169 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 99040f1f5d..19c2e55be3 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -531,6 +531,19 @@ public enum Property {
       "Allow tablets for the " + SystemTables.METADATA.tableName()
           + " table to be suspended via table.suspend.duration.",
       "1.8.0"),
+  
MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT("manager.startup.manager.avail.min.count",
 "0",
+      PropertyType.COUNT,
+      "Minimum number of managers that need to be registered before the 
primary manager will start. A value "
+          + "greater than 0 is useful when multiple managers are supposed to 
be running on startup. "
+          + "When set to 0 or less, no blocking occurs. Default is 0 
(disabled).",
+      "4.0.0"),
+  
MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT("manager.startup.manager.avail.max.wait",
 "0",
+      PropertyType.TIMEDURATION,
+      "Maximum time manager will wait for manager available threshold "
+          + "to be reached before continuing. When set to 0 or less, will 
block "
+          + "indefinitely. Default is 0 to block indefinitely. Only valid when 
manager available "
+          + "threshold is set greater than 1.",
+      "4.0.0"),
   
MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT("manager.startup.tserver.avail.min.count",
 "0",
       PropertyType.COUNT, """
           Minimum number of tservers that need to be registered before manager 
will \
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index c9a7df3a3d..5ff232765c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.ServerOpts;
@@ -1019,6 +1020,14 @@ public class Manager extends AbstractServer
 
     setupAssistantMetrics(fateWorker.getMetricsProducers());
 
+    // wait a configurable amount of time for multiple manager processes to 
start, then
+    // try to get the primary manager lock.
+    try {
+      blockForManagers();
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
     // block until we can obtain the ZK lock for the manager. Create the
     // initial lock using ThriftService.NONE. This will allow the lock
     // allocation to occur, but prevent any services from getting the
@@ -1375,6 +1384,18 @@ public class Manager extends AbstractServer
     }
   }
 
+  private void blockForTservers() throws InterruptedException {
+    blockForServers(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT,
+        Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, () -> 
tserverSet.size(), "tserver");
+  }
+
+  private void blockForManagers() throws InterruptedException {
+    blockForServers(Property.MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT,
+        Property.MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT, () -> 
getContext().getServerPaths()
+            .getAssistantManagers(AddressSelector.all(), true).size(),
+        "manager");
+  }
+
   /**
    * Allows property configuration to block manager start-up waiting for a 
minimum number of
    * tservers to register in zookeeper. It also accepts a maximum time to wait 
- if the time
@@ -1390,19 +1411,19 @@ public class Manager extends AbstractServer
    *
    * @throws InterruptedException if interrupted while blocking, propagated 
for caller to handle.
    */
-  private void blockForTservers() throws InterruptedException {
+  private void blockForServers(Property minServerCountProperty, Property 
minServerWaitProperty,
+      Supplier<Integer> numServers, String serverType) throws 
InterruptedException {
     Timer waitTimer = Timer.startNew();
 
-    long minTserverCount =
-        
getConfiguration().getCount(Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT);
+    long minServerCount = getConfiguration().getCount(minServerCountProperty);
 
-    if (minTserverCount <= 0) {
-      log.info("tserver availability check disabled, continuing with-{} 
servers. To enable, set {}",
-          tserverSet.size(), 
Property.MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT.getKey());
+    if (minServerCount <= 0) {
+      log.info("{} availability check disabled, continuing with-{} servers. To 
enable, set {}",
+          serverType, numServers.get(), minServerCountProperty.getKey());
       return;
     }
-    long userWait = MILLISECONDS.toSeconds(
-        
getConfiguration().getTimeInMillis(Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT));
+    long userWait =
+        
MILLISECONDS.toSeconds(getConfiguration().getTimeInMillis(minServerWaitProperty));
 
     // Setting retry values for defined wait timeouts
     long retries = 10;
@@ -1412,8 +1433,8 @@ public class Manager extends AbstractServer
     long waitIncrement = 0;
 
     if (userWait <= 0) {
-      log.info("tserver availability check set to block indefinitely, To 
change, set {} > 0.",
-          Property.MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT.getKey());
+      log.info("{} availability check set to block indefinitely, To change, 
set {} > 0.",
+          serverType, minServerWaitProperty.getKey());
       userWait = Long.MAX_VALUE;
 
       // If indefinitely blocking, change retry values to support incremental 
backoff and logging.
@@ -1423,42 +1444,42 @@ public class Manager extends AbstractServer
       waitIncrement = 5;
     }
 
-    Retry tserverRetry = Retry.builder().maxRetries(retries)
+    Retry serverRetry = Retry.builder().maxRetries(retries)
         
.retryAfter(Duration.ofSeconds(initialWait)).incrementBy(Duration.ofSeconds(waitIncrement))
         .maxWait(Duration.ofSeconds(maxWaitPeriod)).backOffFactor(1)
         .logInterval(Duration.ofSeconds(30)).createRetry();
 
-    log.info("Checking for tserver availability - need to reach {} servers. 
Have {}",
-        minTserverCount, tserverSet.size());
+    log.info("Checking for {} availability - need to reach {} servers. Have 
{}", serverType,
+        minServerCount, numServers.get());
 
-    boolean needTservers = tserverSet.size() < minTserverCount;
+    boolean needServers = numServers.get() < minServerCount;
 
-    while (needTservers && tserverRetry.canRetry()) {
+    while (needServers && serverRetry.canRetry()) {
 
-      tserverRetry.waitForNextAttempt(log, "block until minimum tservers 
reached");
+      serverRetry.waitForNextAttempt(log, "block until minimum " + serverType 
+ " reached");
 
-      needTservers = tserverSet.size() < minTserverCount;
+      needServers = numServers.get() < minServerCount;
 
       // suppress last message once threshold reached.
-      if (needTservers) {
-        tserverRetry.logRetry(log, String.format(
-            "Blocking for tserver availability - need to reach %s servers. 
Have %s Time spent blocking %s seconds.",
-            minTserverCount, tserverSet.size(), waitTimer.elapsed(SECONDS)));
+      if (needServers) {
+        serverRetry.logRetry(log, String.format(
+            "Blocking for %s availability - need to reach %s servers. Have %s 
Time spent blocking %s seconds.",
+            serverType, minServerCount, numServers.get(), 
waitTimer.elapsed(SECONDS)));
       }
-      tserverRetry.useRetry();
+      serverRetry.useRetry();
     }
 
-    if (tserverSet.size() < minTserverCount) {
+    if (numServers.get() < minServerCount) {
       log.warn(
-          "tserver availability check time expired - continuing. Requested {}, 
have {} tservers on line. "
+          "{} availability check time expired - continuing. Requested {}, have 
{} tservers on line. "
               + " Time waiting {} sec",
-          tserverSet.size(), minTserverCount, waitTimer.elapsed(SECONDS));
+          serverType, numServers.get(), minServerCount, 
waitTimer.elapsed(SECONDS));
 
     } else {
       log.info(
-          "tserver availability check completed. Requested {}, have {} 
tservers on line. "
+          "{} availability check completed. Requested {}, have {} tservers on 
line. "
               + " Time waiting {} sec",
-          tserverSet.size(), minTserverCount, waitTimer.elapsed(SECONDS));
+          serverType, numServers.get(), minServerCount, 
waitTimer.elapsed(SECONDS));
     }
   }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java 
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
similarity index 99%
rename from test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
rename to test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
index e51b13c330..d0d8bdb5ed 100644
--- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
@@ -83,7 +83,7 @@ import com.google.common.net.HostAndPort;
  * </ul>
  *
  */
-public class MultipleManagerIT extends ConfigurableMacBase {
+public class MultipleManagerFateIT extends ConfigurableMacBase {
 
   // A manager that will quickly clean up fate reservations held by dead 
managers
   public static class FastFateCleanupManager extends Manager {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/MultipleManagerStartupWaitIT.java 
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerStartupWaitIT.java
new file mode 100644
index 0000000000..325ef97235
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerStartupWaitIT.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ResourceGroupId;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.KeeperException;
+import org.junit.jupiter.api.Test;
+
+public class MultipleManagerStartupWaitIT extends ConfigurableMacBase {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    // Set this lower so that locks timeout faster
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+    cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MIN_COUNT, "2");
+    cfg.setProperty(Property.MANAGER_STARTUP_MANAGER_AVAIL_MAX_WAIT, "10s");
+    super.configure(cfg, hadoopCoreSite);
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    // Overriding setup here so that the cluster
+    // is not started. We are going to start in
+    // manually in the test method
+  }
+
+  private List<String> getAssistantManagers() throws KeeperException, 
InterruptedException {
+    String zAsstMgrPath = Constants.ZMANAGER_ASSISTANT_LOCK + "/" + 
ResourceGroupId.DEFAULT;
+    return 
getCluster().getServerContext().getZooSession().getChildren(zAsstMgrPath, null);
+  }
+
+  @Test
+  public void testManagerWait() throws Exception {
+
+    AtomicReference<Throwable> startError = new AtomicReference<>();
+    Thread clusterThread = new Thread(() -> {
+      try {
+        super.setUp();
+      } catch (Exception e) {
+        startError.set(e);
+      }
+    });
+    clusterThread.start();
+
+    // Wait a few seconds for processes to start and
+    // for ServerContext to be created
+    Thread.sleep(10_000);
+
+    // One Manager should be up and have acquired the assistant manager lock
+    Wait.waitFor(() -> getAssistantManagers().size() == 1);
+
+    // The Primary Manager lock should not be acquired yet
+    
assertNull(getCluster().getServerContext().getServerPaths().getManager(true));
+
+    // Start the 2nd Manager
+    getCluster().getConfig().getClusterServerConfiguration().setNumManagers(2);
+    getCluster().start();
+
+    // Wait for both Managers to acquire the assistant manager locks
+    Wait.waitFor(() -> getAssistantManagers().size() == 2);
+
+    // The Primary Manager lock should now be acquired yet
+    
assertNotNull(getCluster().getServerContext().getServerPaths().getManager(true));
+
+    List<String> managers = getAssistantManagers();
+    assertEquals(2, managers.size());
+
+    Set<ServerId> primary =
+        
getCluster().getServerContext().instanceOperations().getServers(ServerId.Type.MANAGER);
+    assertNotNull(primary);
+    assertEquals(1, primary.size());
+    
assertTrue(managers.contains(primary.iterator().next().toHostPortString()));
+    assertNull(startError.get());
+  }
+}

Reply via email to