This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit d70dfe4e322c5623240890e5f5c82385c138c6c5
Merge: 15ff873866 ce7705c542
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Feb 28 19:16:48 2025 +0000

    Merge branch '3.1'

 .../java/org/apache/accumulo/core/Constants.java   |   2 +
 .../java/org/apache/accumulo/manager/Manager.java  |  15 +-
 .../manager/upgrade/PreUpgradeValidation.java      |  10 +-
 .../manager/upgrade/UpgradeCoordinator.java        |  59 ++++-
 .../accumulo/manager/upgrade/UpgradeProgress.java  |  80 +++++++
 .../manager/upgrade/UpgradeProgressTracker.java    | 174 +++++++++++++++
 .../accumulo/manager/upgrade/AccumuloTest.java     |  31 ++-
 .../manager/upgrade/UpgradeProgressTest.java       |  72 ++++++
 .../test/upgrade/UpgradeProgressTrackerIT.java     | 243 +++++++++++++++++++++
 9 files changed, 657 insertions(+), 29 deletions(-)

diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 45c0bc2595,b55d332ed4..b38fa024e7
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -125,10 -114,8 +125,9 @@@ import org.apache.accumulo.manager.comp
  import org.apache.accumulo.manager.metrics.BalancerMetrics;
  import org.apache.accumulo.manager.metrics.ManagerMetrics;
  import org.apache.accumulo.manager.recovery.RecoveryManager;
 +import org.apache.accumulo.manager.split.Splitter;
  import org.apache.accumulo.manager.state.TableCounts;
  import org.apache.accumulo.manager.tableOps.TraceRepo;
- import org.apache.accumulo.manager.upgrade.PreUpgradeValidation;
  import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
  import org.apache.accumulo.server.AbstractServer;
  import org.apache.accumulo.server.HighlyAvailableService;
@@@ -458,10 -421,10 +458,11 @@@ public class Manager extends AbstractSe
      }
    }
  
 -  protected Manager(ConfigOpts opts, String[] args) throws IOException {
 -    super("manager", opts, args);
 +  protected Manager(ConfigOpts opts, 
Function<SiteConfiguration,ServerContext> serverContextFactory,
 +      String[] args) throws IOException {
 +    super(ServerId.Type.MANAGER, opts, serverContextFactory, args);
      ServerContext context = super.getContext();
+     upgradeCoordinator = new UpgradeCoordinator(context);
      balancerEnvironment = new BalancerEnvironmentImpl(context);
  
      AccumuloConfiguration aconf = context.getConfiguration();
@@@ -1170,20 -1278,17 +1171,23 @@@
      } catch (KeeperException | InterruptedException e) {
        throw new IllegalStateException("Exception getting manager lock", e);
      }
+     // Setting the Manager state to HAVE_LOCK has the side-effect of
+     // starting the upgrade process if necessary.
+     setManagerState(ManagerState.HAVE_LOCK);
  
 -    MetricsInfo metricsInfo = getContext().getMetricsInfo();
 +    // Set the HostName **after** initially creating the lock. The lock data 
is
 +    // updated below with the correct address. This prevents clients from 
accessing
 +    // the Manager until all of the internal processes are started.
 +    setHostname(sa.address);
  
 -    var producers = ManagerMetrics.getProducers(getConfiguration(), this);
 +    MetricsInfo metricsInfo = getContext().getMetricsInfo();
 +    ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), 
this);
 +    var producers = managerMetrics.getProducers(getConfiguration(), this);
      producers.add(balancerMetrics);
 +
      metricsInfo.addMetricsProducers(producers.toArray(new 
MetricsProducer[0]));
      metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), 
getApplicationName(),
 -        sa.getAddress(), ""));
 +        sa.getAddress(), getResourceGroup()));
  
      recoveryManager = new RecoveryManager(this, 
timeToCacheRecoveryWalExistence);
  
@@@ -1563,8 -1636,6 +1567,7 @@@
        sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, MILLISECONDS);
      }
  
 +    this.getContext().setServiceLock(getManagerLock());
-     setManagerState(ManagerState.HAVE_LOCK);
      return sld;
    }
  
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index 13a8e8f67d,9cde40436d..400c8a07c8
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@@ -128,14 -128,20 +128,21 @@@ public class UpgradeCoordinator 
    private int currentVersion;
    // map of "current version" -> upgrader to next version.
    // Sorted so upgrades execute in order from the oldest supported data 
version to current
 -  private final Map<Integer,Upgrader> upgraders =
 -      Collections.unmodifiableMap(new 
TreeMap<>(Map.of(ROOT_TABLET_META_CHANGES,
 -          new Upgrader10to11(), REMOVE_DEPRECATIONS_FOR_VERSION_3, new 
Upgrader11to12())));
 +  private final Map<Integer,
 +      Upgrader> upgraders = Collections.unmodifiableMap(new TreeMap<>(
 +          Map.of(ROOT_TABLET_META_CHANGES, new Upgrader10to11(), 
REMOVE_DEPRECATIONS_FOR_VERSION_3,
 +              new Upgrader11to12(), METADATA_FILE_JSON_ENCODING, new 
Upgrader12to13())));
  
+   private final ServerContext context;
+   private final UpgradeProgressTracker progressTracker;
+   private final PreUpgradeValidation preUpgradeValidator;
+ 
    private volatile UpgradeStatus status;
  
-   public UpgradeCoordinator() {
+   public UpgradeCoordinator(ServerContext context) {
+     this.context = context;
+     progressTracker = new UpgradeProgressTracker(context);
+     preUpgradeValidator = new PreUpgradeValidation();
      status = UpgradeStatus.INITIAL;
    }
  
@@@ -312,15 -352,11 +353,15 @@@
     */
    @SuppressFBWarnings(value = "DM_EXIT",
        justification = "Want to immediately stop all manager threads on 
upgrade error")
-   private void abortIfFateTransactions(ServerContext context) {
+   private void abortIfFateTransactions() {
      try {
 -      final ReadOnlyTStore<UpgradeCoordinator> fate =
 -          new ZooStore<>(context.getZooKeeperRoot() + Constants.ZFATE, 
context.getZooSession());
 -      if (!fate.list().isEmpty()) {
 +      // The current version of the code creates the new accumulo.fate table 
on upgrade, so no
 +      // attempt is made to read it here. Attempting to read it this point 
would likely cause a hang
 +      // as tablets are not assigned when this is called. The Fate code is 
not used to read from
 +      // zookeeper below because the serialization format changed in 
zookeeper, that is why a direct
 +      // read is performed.
 +      if (!context.getZooSession().asReader()
 +          .getChildren(context.getZooKeeperRoot() + 
Constants.ZFATE).isEmpty()) {
          throw new AccumuloException("Aborting upgrade because there are"
              + " outstanding FATE transactions from a previous Accumulo 
version."
              + " You can start the tservers and then use the shell to delete 
completed "

Reply via email to