This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit d70dfe4e322c5623240890e5f5c82385c138c6c5 Merge: 15ff873866 ce7705c542 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Feb 28 19:16:48 2025 +0000 Merge branch '3.1' .../java/org/apache/accumulo/core/Constants.java | 2 + .../java/org/apache/accumulo/manager/Manager.java | 15 +- .../manager/upgrade/PreUpgradeValidation.java | 10 +- .../manager/upgrade/UpgradeCoordinator.java | 59 ++++- .../accumulo/manager/upgrade/UpgradeProgress.java | 80 +++++++ .../manager/upgrade/UpgradeProgressTracker.java | 174 +++++++++++++++ .../accumulo/manager/upgrade/AccumuloTest.java | 31 ++- .../manager/upgrade/UpgradeProgressTest.java | 72 ++++++ .../test/upgrade/UpgradeProgressTrackerIT.java | 243 +++++++++++++++++++++ 9 files changed, 657 insertions(+), 29 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 45c0bc2595,b55d332ed4..b38fa024e7 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -125,10 -114,8 +125,9 @@@ import org.apache.accumulo.manager.comp import org.apache.accumulo.manager.metrics.BalancerMetrics; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; +import org.apache.accumulo.manager.split.Splitter; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.tableOps.TraceRepo; - import org.apache.accumulo.manager.upgrade.PreUpgradeValidation; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.HighlyAvailableService; @@@ -458,10 -421,10 +458,11 @@@ public class Manager extends AbstractSe } } - protected Manager(ConfigOpts opts, String[] args) throws IOException { - super("manager", opts, args); + protected Manager(ConfigOpts opts, Function<SiteConfiguration,ServerContext> serverContextFactory, + String[] args) throws IOException { + super(ServerId.Type.MANAGER, opts, serverContextFactory, args); ServerContext context = super.getContext(); + upgradeCoordinator = new UpgradeCoordinator(context); balancerEnvironment = new BalancerEnvironmentImpl(context); AccumuloConfiguration aconf = context.getConfiguration(); @@@ -1170,20 -1278,17 +1171,23 @@@ } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception getting manager lock", e); } + // Setting the Manager state to HAVE_LOCK has the side-effect of + // starting the upgrade process if necessary. + setManagerState(ManagerState.HAVE_LOCK); - MetricsInfo metricsInfo = getContext().getMetricsInfo(); + // Set the HostName **after** initially creating the lock. The lock data is + // updated below with the correct address. This prevents clients from accessing + // the Manager until all of the internal processes are started. + setHostname(sa.address); - var producers = ManagerMetrics.getProducers(getConfiguration(), this); + MetricsInfo metricsInfo = getContext().getMetricsInfo(); + ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), this); + var producers = managerMetrics.getProducers(getConfiguration(), this); producers.add(balancerMetrics); + metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0])); metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), - sa.getAddress(), "")); + sa.getAddress(), getResourceGroup())); recoveryManager = new RecoveryManager(this, timeToCacheRecoveryWalExistence); @@@ -1563,8 -1636,6 +1567,7 @@@ sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, MILLISECONDS); } + this.getContext().setServiceLock(getManagerLock()); - setManagerState(ManagerState.HAVE_LOCK); return sld; } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 13a8e8f67d,9cde40436d..400c8a07c8 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@@ -128,14 -128,20 +128,21 @@@ public class UpgradeCoordinator private int currentVersion; // map of "current version" -> upgrader to next version. // Sorted so upgrades execute in order from the oldest supported data version to current - private final Map<Integer,Upgrader> upgraders = - Collections.unmodifiableMap(new TreeMap<>(Map.of(ROOT_TABLET_META_CHANGES, - new Upgrader10to11(), REMOVE_DEPRECATIONS_FOR_VERSION_3, new Upgrader11to12()))); + private final Map<Integer, + Upgrader> upgraders = Collections.unmodifiableMap(new TreeMap<>( + Map.of(ROOT_TABLET_META_CHANGES, new Upgrader10to11(), REMOVE_DEPRECATIONS_FOR_VERSION_3, + new Upgrader11to12(), METADATA_FILE_JSON_ENCODING, new Upgrader12to13()))); + private final ServerContext context; + private final UpgradeProgressTracker progressTracker; + private final PreUpgradeValidation preUpgradeValidator; + private volatile UpgradeStatus status; - public UpgradeCoordinator() { + public UpgradeCoordinator(ServerContext context) { + this.context = context; + progressTracker = new UpgradeProgressTracker(context); + preUpgradeValidator = new PreUpgradeValidation(); status = UpgradeStatus.INITIAL; } @@@ -312,15 -352,11 +353,15 @@@ */ @SuppressFBWarnings(value = "DM_EXIT", justification = "Want to immediately stop all manager threads on upgrade error") - private void abortIfFateTransactions(ServerContext context) { + private void abortIfFateTransactions() { try { - final ReadOnlyTStore<UpgradeCoordinator> fate = - new ZooStore<>(context.getZooKeeperRoot() + Constants.ZFATE, context.getZooSession()); - if (!fate.list().isEmpty()) { + // The current version of the code creates the new accumulo.fate table on upgrade, so no + // attempt is made to read it here. Attempting to read it this point would likely cause a hang + // as tablets are not assigned when this is called. The Fate code is not used to read from + // zookeeper below because the serialization format changed in zookeeper, that is why a direct + // read is performed. + if (!context.getZooSession().asReader() + .getChildren(context.getZooKeeperRoot() + Constants.ZFATE).isEmpty()) { throw new AccumuloException("Aborting upgrade because there are" + " outstanding FATE transactions from a previous Accumulo version." + " You can start the tservers and then use the shell to delete completed "