This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 8f92187c59c1d5e269d443882a8d49d4868a8bf6 Merge: f106e10153 63dd6503ad Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Sun Jul 28 22:24:43 2024 -0400 Merge branch 'main' into elasticity .../java/org/apache/accumulo/manager/Manager.java | 84 ++++++++++------------ 1 file changed, 39 insertions(+), 45 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 260f1823f5,968e5c1c98..0690e93afa --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -50,8 -50,6 +50,7 @@@ import java.util.concurrent.CountDownLa import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; - import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@@ -235,16 -223,11 +234,15 @@@ public class Manager extends AbstractSe final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus(); private final AtomicBoolean managerInitialized = new AtomicBoolean(false); - private final AtomicBoolean managerUpgrading = new AtomicBoolean(false); - private final long timeToCacheRecoveryWalExistence; + private final long timeToCacheRecoveryWalExistence; private ExecutorService tableInformationStatusPool = null; + private ThreadPoolExecutor tabletRefreshThreadPool; + + private final TabletStateStore rootTabletStore; + private final TabletStateStore metadataTabletStore; + private final TabletStateStore userTabletStore; - @Override public synchronized ManagerState getManagerState() { return state; } @@@ -1140,18 -1248,11 +1138,12 @@@ throw new IllegalStateException("Exception getting manager lock", e); } - // If UpgradeStatus is not at complete by this moment, then things are currently - // upgrading. - if (upgradeCoordinator.getStatus() != UpgradeCoordinator.UpgradeStatus.COMPLETE) { - managerUpgrading.set(true); - } - MetricsInfo metricsInfo = getContext().getMetricsInfo(); - metricsInfo.addServiceTags(getApplicationName(), sa.getAddress()); - - var producers = ManagerMetrics.getProducers(getConfiguration(), this); + metricsInfo.addServiceTags(getApplicationName(), sa.getAddress(), getResourceGroup()); + ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), this); + var producers = managerMetrics.getProducers(getConfiguration(), this); producers.add(balancerMetrics); + metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0])); metricsInfo.init(); @@@ -1251,18 -1340,22 +1241,22 @@@ throw new IllegalStateException("Metadata upgrade failed", e); } + // Everything should be fully upgraded by this point, but check before starting fate + if (isUpgrading()) { + throw new IllegalStateException("Upgrade coordinator is unexpectedly not complete"); + } try { - final AgeOffStore<Manager> store = new AgeOffStore<>( - new org.apache.accumulo.core.fate.ZooStore<>(getZooKeeperRoot() + Constants.ZFATE, - context.getZooReaderWriter()), - HOURS.toMillis(8), System::currentTimeMillis); - - Fate<Manager> f = initializeFateInstance(store, getConfiguration()); - fateRef.set(f); + var metaInstance = initializeFateInstance(context, + new MetaFateStore<>(getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter())); + var userInstance = initializeFateInstance(context, + new UserFateStore<>(context, AccumuloTable.FATE.tableName())); + + if (!fateRefs.compareAndSet(null, + Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userInstance))) { + throw new IllegalStateException( + "Unexpected previous fate reference map already initialized"); + } fateReadyLatch.countDown(); - - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() - .scheduleWithFixedDelay(store::ageOff, 63000, 63000, MILLISECONDS)); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception setting up FaTE cleanup thread", e); }