This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 763367fed3 Don't fully start some servers until upgrade is complete. (#5378) 763367fed3 is described below commit 763367fed3a2f86b66a6c810fc707c3e5d8fee9c Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Mar 6 13:55:21 2025 -0500 Don't fully start some servers until upgrade is complete. (#5378) Prevent the CompactionCoordinator, GarbageCollector, and ScanServer from fully starting until the current version of software matches the version stored on disk. Backported AccumuloDataVersion.getCurrentVersion for this change. Closes #5367 --- .../java/org/apache/accumulo/server/AbstractServer.java | 7 +++++++ .../org/apache/accumulo/server/AccumuloDataVersion.java | 14 ++++++++++++++ .../apache/accumulo/coordinator/CompactionCoordinator.java | 9 +++++++++ .../accumulo/coordinator/CompactionCoordinatorTest.java | 4 ++++ .../org/apache/accumulo/gc/SimpleGarbageCollector.java | 8 ++++++++ .../accumulo/manager/upgrade/UpgradeCoordinator.java | 4 +--- .../main/java/org/apache/accumulo/tserver/ScanServer.java | 13 ++++++++++++- 7 files changed, 55 insertions(+), 4 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index 69378845e3..1ab7c186c3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -263,4 +263,11 @@ public abstract class AbstractServer @Override public void close() {} + protected void waitForUpgrade() throws InterruptedException { + while (AccumuloDataVersion.getCurrentVersion(getContext()) < AccumuloDataVersion.get()) { + LOG.info("Waiting for upgrade to complete."); + Thread.sleep(1000); + } + } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java index 666f1136d3..11b3deefd6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java @@ -78,4 +78,18 @@ public class AccumuloDataVersion { public static final Set<Integer> CAN_RUN = Set.of(SHORTEN_RFILE_KEYS, CRYPTO_CHANGES, CURRENT_VERSION); + + /** + * Get the stored, current working version. + * + * @param context the server context + * @return the stored data version + */ + public static int getCurrentVersion(ServerContext context) { + int cv = + context.getServerDirs().getAccumuloPersistentVersion(context.getVolumeManager().getFirst()); + ServerContext.ensureDataVersionCompatible(cv); + return cv; + } + } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 5aad919206..4b49e88415 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -101,6 +101,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.collect.Sets; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.micrometer.core.instrument.Tag; public class CompactionCoordinator extends AbstractServer implements @@ -265,8 +266,16 @@ public class CompactionCoordinator extends AbstractServer implements } @Override + @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit") public void run() { + try { + waitForUpgrade(); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for upgrade to complete, exiting..."); + System.exit(1); + } + ServerAddress coordinatorAddress = null; try { coordinatorAddress = startCoordinatorClientService(); diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 8c42064274..1f62ede587 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -224,6 +224,10 @@ public class CompactionCoordinatorTest { public Collection<Tag> getServiceTags(HostAndPort clientAddr) { return List.of(); } + + @Override + protected void waitForUpgrade() throws InterruptedException {} + } @Test diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 7d7096a860..179f97cce9 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -152,6 +152,14 @@ public class SimpleGarbageCollector extends AbstractServer @Override @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit") public void run() { + + try { + waitForUpgrade(); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for upgrade to complete, exiting..."); + System.exit(1); + } + final VolumeManager fs = getContext().getVolumeManager(); // Sleep for an initial period, giving the manager time to start up and diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 2b93d4e84f..1f593d16fe 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -160,9 +160,7 @@ public class UpgradeCoordinator { "Not currently in a suitable state to do zookeeper upgrade %s", status); try { - int cv = context.getServerDirs() - .getAccumuloPersistentVersion(context.getVolumeManager().getFirst()); - ServerContext.ensureDataVersionCompatible(cv); + int cv = AccumuloDataVersion.getCurrentVersion(context); this.currentVersion = cv; if (cv == AccumuloDataVersion.get()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 245302e14f..19aade0113 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -127,6 +127,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + public class ScanServer extends AbstractServer implements TabletScanClientService.Iface, TabletHostingServer, ServerProcessService.Iface { @@ -375,7 +377,16 @@ public class ScanServer extends AbstractServer } @Override + @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit") public void run() { + + try { + waitForUpgrade(); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for upgrade to complete, exiting..."); + System.exit(1); + } + SecurityUtil.serverLogin(getConfiguration()); ServerAddress address = null; @@ -383,7 +394,7 @@ public class ScanServer extends AbstractServer address = startScanServerClientService(); clientAddress = address.getAddress(); } catch (UnknownHostException e1) { - throw new RuntimeException("Failed to start the compactor client service", e1); + throw new RuntimeException("Failed to start the scan server client service", e1); } MetricsInfo metricsInfo = getContext().getMetricsInfo();