This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 8f92187c59c1d5e269d443882a8d49d4868a8bf6
Merge: f106e10153 63dd6503ad
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Sun Jul 28 22:24:43 2024 -0400

    Merge branch 'main' into elasticity

 .../java/org/apache/accumulo/manager/Manager.java  | 84 ++++++++++------------
 1 file changed, 39 insertions(+), 45 deletions(-)

diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 260f1823f5,968e5c1c98..0690e93afa
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -50,8 -50,6 +50,7 @@@ import java.util.concurrent.CountDownLa
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Future;
- import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.ThreadPoolExecutor;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicReference;
@@@ -235,16 -223,11 +234,15 @@@ public class Manager extends AbstractSe
    final ServerBulkImportStatus bulkImportStatus = new 
ServerBulkImportStatus();
  
    private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
-   private final AtomicBoolean managerUpgrading = new AtomicBoolean(false);
 -  private final long timeToCacheRecoveryWalExistence;
  
 +  private final long timeToCacheRecoveryWalExistence;
    private ExecutorService tableInformationStatusPool = null;
 +  private ThreadPoolExecutor tabletRefreshThreadPool;
 +
 +  private final TabletStateStore rootTabletStore;
 +  private final TabletStateStore metadataTabletStore;
 +  private final TabletStateStore userTabletStore;
  
 -  @Override
    public synchronized ManagerState getManagerState() {
      return state;
    }
@@@ -1140,18 -1248,11 +1138,12 @@@
        throw new IllegalStateException("Exception getting manager lock", e);
      }
  
-     // If UpgradeStatus is not at complete by this moment, then things are 
currently
-     // upgrading.
-     if (upgradeCoordinator.getStatus() != 
UpgradeCoordinator.UpgradeStatus.COMPLETE) {
-       managerUpgrading.set(true);
-     }
- 
      MetricsInfo metricsInfo = getContext().getMetricsInfo();
 -    metricsInfo.addServiceTags(getApplicationName(), sa.getAddress());
 -
 -    var producers = ManagerMetrics.getProducers(getConfiguration(), this);
 +    metricsInfo.addServiceTags(getApplicationName(), sa.getAddress(), 
getResourceGroup());
 +    ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), 
this);
 +    var producers = managerMetrics.getProducers(getConfiguration(), this);
      producers.add(balancerMetrics);
 +
      metricsInfo.addMetricsProducers(producers.toArray(new 
MetricsProducer[0]));
      metricsInfo.init();
  
@@@ -1251,18 -1340,22 +1241,22 @@@
        throw new IllegalStateException("Metadata upgrade failed", e);
      }
  
+     // Everything should be fully upgraded by this point, but check before 
starting fate
+     if (isUpgrading()) {
+       throw new IllegalStateException("Upgrade coordinator is unexpectedly 
not complete");
+     }
      try {
 -      final AgeOffStore<Manager> store = new AgeOffStore<>(
 -          new org.apache.accumulo.core.fate.ZooStore<>(getZooKeeperRoot() + 
Constants.ZFATE,
 -              context.getZooReaderWriter()),
 -          HOURS.toMillis(8), System::currentTimeMillis);
 -
 -      Fate<Manager> f = initializeFateInstance(store, getConfiguration());
 -      fateRef.set(f);
 +      var metaInstance = initializeFateInstance(context,
 +          new MetaFateStore<>(getZooKeeperRoot() + Constants.ZFATE, 
context.getZooReaderWriter()));
 +      var userInstance = initializeFateInstance(context,
 +          new UserFateStore<>(context, AccumuloTable.FATE.tableName()));
 +
 +      if (!fateRefs.compareAndSet(null,
 +          Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, 
userInstance))) {
 +        throw new IllegalStateException(
 +            "Unexpected previous fate reference map already initialized");
 +      }
        fateReadyLatch.countDown();
 -
 -      ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
 -          .scheduleWithFixedDelay(store::ageOff, 63000, 63000, MILLISECONDS));
      } catch (KeeperException | InterruptedException e) {
        throw new IllegalStateException("Exception setting up FaTE cleanup 
thread", e);
      }

Reply via email to