This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 763367fed3 Don't fully start some servers until upgrade is complete. 
(#5378)
763367fed3 is described below

commit 763367fed3a2f86b66a6c810fc707c3e5d8fee9c
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Mar 6 13:55:21 2025 -0500

    Don't fully start some servers until upgrade is complete. (#5378)
    
    Prevent the CompactionCoordinator, GarbageCollector, and
    ScanServer from fully starting until the current version
    of software matches the version stored on disk. Backported
    AccumuloDataVersion.getCurrentVersion for this change.
    
    Closes #5367
---
 .../java/org/apache/accumulo/server/AbstractServer.java    |  7 +++++++
 .../org/apache/accumulo/server/AccumuloDataVersion.java    | 14 ++++++++++++++
 .../apache/accumulo/coordinator/CompactionCoordinator.java |  9 +++++++++
 .../accumulo/coordinator/CompactionCoordinatorTest.java    |  4 ++++
 .../org/apache/accumulo/gc/SimpleGarbageCollector.java     |  8 ++++++++
 .../accumulo/manager/upgrade/UpgradeCoordinator.java       |  4 +---
 .../main/java/org/apache/accumulo/tserver/ScanServer.java  | 13 ++++++++++++-
 7 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java 
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 69378845e3..1ab7c186c3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -263,4 +263,11 @@ public abstract class AbstractServer
   @Override
   public void close() {}
 
+  protected void waitForUpgrade() throws InterruptedException {
+    while (AccumuloDataVersion.getCurrentVersion(getContext()) < 
AccumuloDataVersion.get()) {
+      LOG.info("Waiting for upgrade to complete.");
+      Thread.sleep(1000);
+    }
+  }
+
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java 
b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
index 666f1136d3..11b3deefd6 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
@@ -78,4 +78,18 @@ public class AccumuloDataVersion {
 
   public static final Set<Integer> CAN_RUN =
       Set.of(SHORTEN_RFILE_KEYS, CRYPTO_CHANGES, CURRENT_VERSION);
+
+  /**
+   * Get the stored, current working version.
+   *
+   * @param context the server context
+   * @return the stored data version
+   */
+  public static int getCurrentVersion(ServerContext context) {
+    int cv =
+        
context.getServerDirs().getAccumuloPersistentVersion(context.getVolumeManager().getFirst());
+    ServerContext.ensureDataVersionCompatible(cv);
+    return cv;
+  }
+
 }
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 5aad919206..4b49e88415 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -101,6 +101,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.Sets;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.micrometer.core.instrument.Tag;
 
 public class CompactionCoordinator extends AbstractServer implements
@@ -265,8 +266,16 @@ public class CompactionCoordinator extends AbstractServer 
implements
   }
 
   @Override
+  @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call 
System.exit")
   public void run() {
 
+    try {
+      waitForUpgrade();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while waiting for upgrade to complete, 
exiting...");
+      System.exit(1);
+    }
+
     ServerAddress coordinatorAddress = null;
     try {
       coordinatorAddress = startCoordinatorClientService();
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 8c42064274..1f62ede587 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -224,6 +224,10 @@ public class CompactionCoordinatorTest {
     public Collection<Tag> getServiceTags(HostAndPort clientAddr) {
       return List.of();
     }
+
+    @Override
+    protected void waitForUpgrade() throws InterruptedException {}
+
   }
 
   @Test
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 7d7096a860..179f97cce9 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -152,6 +152,14 @@ public class SimpleGarbageCollector extends AbstractServer
   @Override
   @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call 
System.exit")
   public void run() {
+
+    try {
+      waitForUpgrade();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while waiting for upgrade to complete, 
exiting...");
+      System.exit(1);
+    }
+
     final VolumeManager fs = getContext().getVolumeManager();
 
     // Sleep for an initial period, giving the manager time to start up and
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index 2b93d4e84f..1f593d16fe 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@ -160,9 +160,7 @@ public class UpgradeCoordinator {
         "Not currently in a suitable state to do zookeeper upgrade %s", 
status);
 
     try {
-      int cv = context.getServerDirs()
-          .getAccumuloPersistentVersion(context.getVolumeManager().getFirst());
-      ServerContext.ensureDataVersionCompatible(cv);
+      int cv = AccumuloDataVersion.getCurrentVersion(context);
       this.currentVersion = cv;
 
       if (cv == AccumuloDataVersion.get()) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 245302e14f..19aade0113 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -127,6 +127,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
 public class ScanServer extends AbstractServer
     implements TabletScanClientService.Iface, TabletHostingServer, 
ServerProcessService.Iface {
 
@@ -375,7 +377,16 @@ public class ScanServer extends AbstractServer
   }
 
   @Override
+  @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call 
System.exit")
   public void run() {
+
+    try {
+      waitForUpgrade();
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while waiting for upgrade to complete, 
exiting...");
+      System.exit(1);
+    }
+
     SecurityUtil.serverLogin(getConfiguration());
 
     ServerAddress address = null;
@@ -383,7 +394,7 @@ public class ScanServer extends AbstractServer
       address = startScanServerClientService();
       clientAddress = address.getAddress();
     } catch (UnknownHostException e1) {
-      throw new RuntimeException("Failed to start the compactor client 
service", e1);
+      throw new RuntimeException("Failed to start the scan server client 
service", e1);
     }
 
     MetricsInfo metricsInfo = getContext().getMetricsInfo();

Reply via email to