This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 0148294e35dbcf502fb0f9df6a178d669e4c8f3f Merge: 7b3e52f1ae 311c5fe821 Author: Ed Coleman <edcole...@apache.org> AuthorDate: Fri Mar 8 19:02:50 2024 +0000 Merge remote-tracking branch 'upstream/main' into elasticity Additional changes from merge * Unused type variable in initializeFateInstance * unneeded initialization .../java/org/apache/accumulo/manager/Manager.java | 23 +++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 95fb00777e,bdf73b9191..6758acfc1f --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -243,20 -231,6 +243,20 @@@ public class Manager extends AbstractSe return state; } + // ELASTICITIY_TODO it would be nice if this method could take DataLevel as an argument and only + // retrieve information about compactions in that data level. Attempted this and a lot of + // refactoring was needed to get that small bit of information to this method. Would be best to + // address this after issue. May be best to attempt this after #3576. + public Map<FateId,Map<String,String>> getCompactionHints() { - Map<FateId,CompactionConfig> allConfig = null; ++ Map<FateId,CompactionConfig> allConfig; + try { + allConfig = CompactionConfigStorage.getAllConfig(getContext(), tableId -> true); + } catch (InterruptedException | KeeperException e) { + throw new RuntimeException(e); + } + return Maps.transformValues(allConfig, CompactionConfig::getExecutionHints); + } + public boolean stillManager() { return getManagerState() != ManagerState.STOP; } @@@ -620,9 -757,9 +620,9 @@@ @Override public void run() { - EventCoordinator.Listener eventListener = nextEvent.getListener(); + EventCoordinator.Tracker eventTracker = nextEvent.getTracker(); while (stillManager()) { - long wait = DEFAULT_WAIT_FOR_WATCHER; + long wait; try { switch (getManagerGoalState()) { case NORMAL: @@@ -960,7 -1088,7 +960,7 @@@ log.info("Started Manager client service at {}", sa.address); // block until we can obtain the ZK lock for the manager -- ServiceLockData sld = null; ++ ServiceLockData sld; try { sld = getManagerLock(ServiceLock.path(zroot + Constants.ZMANAGER_LOCK)); } catch (KeeperException | InterruptedException e) { @@@ -1072,17 -1197,17 +1072,17 @@@ } try { - var metaInstance = initializeFateInstance(context, FateInstanceType.META, - final AgeOffStore<Manager> store = new AgeOffStore<>( - new org.apache.accumulo.core.fate.ZooStore<>(getZooKeeperRoot() + Constants.ZFATE, - context.getZooReaderWriter()), - HOURS.toMillis(8), System::currentTimeMillis); - - Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString, getConfiguration()); - fateRef.set(f); ++ var metaInstance = initializeFateInstance(context, + new ZooStore<>(getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter())); - var userInstance = initializeFateInstance(context, FateInstanceType.USER, ++ var userInstance = initializeFateInstance(context, + new AccumuloStore<>(context, AccumuloTable.FATE.tableName())); + + if (!fateRefs.compareAndSet(null, + Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userInstance))) { + throw new IllegalStateException( + "Unexpected previous fate reference map already initialized"); + } fateReadyLatch.countDown(); - - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() - .scheduleWithFixedDelay(store::ageOff, 63000, 63000, MILLISECONDS)); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception setting up FaTE cleanup thread", e); } @@@ -1117,15 -1242,9 +1117,15 @@@ } String address = sa.address.toString(); - sld = new ServiceLockData(sld.getServerUUID(ThriftService.MANAGER), address, - ThriftService.MANAGER); - log.info("Setting manager lock data to {}", sld.toString()); + UUID uuid = sld.getServerUUID(ThriftService.MANAGER); + ServiceDescriptors descriptors = new ServiceDescriptors(); + for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR, + ThriftService.FATE}) { + descriptors.addService(new ServiceDescriptor(uuid, svc, address, this.getResourceGroup())); + } + + sld = new ServiceLockData(descriptors); - log.info("Setting manager lock data to {}", sld.toString()); ++ log.info("Setting manager lock data to {}", sld); try { managerLock.replaceLockData(sld); } catch (KeeperException | InterruptedException e) { @@@ -1182,19 -1297,6 +1182,18 @@@ log.info("exiting"); } - private Fate<Manager> initializeFateInstance(ServerContext context, FateInstanceType type, - FateStore<Manager> store) { ++ private Fate<Manager> initializeFateInstance(ServerContext context, FateStore<Manager> store) { + + final Fate<Manager> fateInstance = + new Fate<>(this, store, TraceRepo::toLogString, getConfiguration()); + + var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), System::nanoTime); + ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() + .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES)); + + return fateInstance; + } + /** * Allows property configuration to block manager start-up waiting for a minimum number of * tservers to register in zookeeper. It also accepts a maximum time to wait - if the time