This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 0148294e35dbcf502fb0f9df6a178d669e4c8f3f
Merge: 7b3e52f1ae 311c5fe821
Author: Ed Coleman <edcole...@apache.org>
AuthorDate: Fri Mar 8 19:02:50 2024 +0000

    Merge remote-tracking branch 'upstream/main' into elasticity
    
    Additional changes from merge
     * Unused type variable in initializeFateInstance
     * unneeded initialization

 .../java/org/apache/accumulo/manager/Manager.java  | 23 +++++++++++-----------
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 95fb00777e,bdf73b9191..6758acfc1f
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -243,20 -231,6 +243,20 @@@ public class Manager extends AbstractSe
      return state;
    }
  
 +  // ELASTICITIY_TODO it would be nice if this method could take DataLevel as 
an argument and only
 +  // retrieve information about compactions in that data level. Attempted 
this and a lot of
 +  // refactoring was needed to get that small bit of information to this 
method. Would be best to
 +  // address this after issue. May be best to attempt this after #3576.
 +  public Map<FateId,Map<String,String>> getCompactionHints() {
-     Map<FateId,CompactionConfig> allConfig = null;
++    Map<FateId,CompactionConfig> allConfig;
 +    try {
 +      allConfig = CompactionConfigStorage.getAllConfig(getContext(), tableId 
-> true);
 +    } catch (InterruptedException | KeeperException e) {
 +      throw new RuntimeException(e);
 +    }
 +    return Maps.transformValues(allConfig, 
CompactionConfig::getExecutionHints);
 +  }
 +
    public boolean stillManager() {
      return getManagerState() != ManagerState.STOP;
    }
@@@ -620,9 -757,9 +620,9 @@@
  
      @Override
      public void run() {
 -      EventCoordinator.Listener eventListener = nextEvent.getListener();
 +      EventCoordinator.Tracker eventTracker = nextEvent.getTracker();
        while (stillManager()) {
-         long wait = DEFAULT_WAIT_FOR_WATCHER;
+         long wait;
          try {
            switch (getManagerGoalState()) {
              case NORMAL:
@@@ -960,7 -1088,7 +960,7 @@@
      log.info("Started Manager client service at {}", sa.address);
  
      // block until we can obtain the ZK lock for the manager
--    ServiceLockData sld = null;
++    ServiceLockData sld;
      try {
        sld = getManagerLock(ServiceLock.path(zroot + Constants.ZMANAGER_LOCK));
      } catch (KeeperException | InterruptedException e) {
@@@ -1072,17 -1197,17 +1072,17 @@@
      }
  
      try {
-       var metaInstance = initializeFateInstance(context, 
FateInstanceType.META,
 -      final AgeOffStore<Manager> store = new AgeOffStore<>(
 -          new org.apache.accumulo.core.fate.ZooStore<>(getZooKeeperRoot() + 
Constants.ZFATE,
 -              context.getZooReaderWriter()),
 -          HOURS.toMillis(8), System::currentTimeMillis);
 -
 -      Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString, 
getConfiguration());
 -      fateRef.set(f);
++      var metaInstance = initializeFateInstance(context,
 +          new ZooStore<>(getZooKeeperRoot() + Constants.ZFATE, 
context.getZooReaderWriter()));
-       var userInstance = initializeFateInstance(context, 
FateInstanceType.USER,
++      var userInstance = initializeFateInstance(context,
 +          new AccumuloStore<>(context, AccumuloTable.FATE.tableName()));
 +
 +      if (!fateRefs.compareAndSet(null,
 +          Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, 
userInstance))) {
 +        throw new IllegalStateException(
 +            "Unexpected previous fate reference map already initialized");
 +      }
        fateReadyLatch.countDown();
 -
 -      ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
 -          .scheduleWithFixedDelay(store::ageOff, 63000, 63000, MILLISECONDS));
      } catch (KeeperException | InterruptedException e) {
        throw new IllegalStateException("Exception setting up FaTE cleanup 
thread", e);
      }
@@@ -1117,15 -1242,9 +1117,15 @@@
      }
  
      String address = sa.address.toString();
 -    sld = new ServiceLockData(sld.getServerUUID(ThriftService.MANAGER), 
address,
 -        ThriftService.MANAGER);
 -    log.info("Setting manager lock data to {}", sld.toString());
 +    UUID uuid = sld.getServerUUID(ThriftService.MANAGER);
 +    ServiceDescriptors descriptors = new ServiceDescriptors();
 +    for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, 
ThriftService.COORDINATOR,
 +        ThriftService.FATE}) {
 +      descriptors.addService(new ServiceDescriptor(uuid, svc, address, 
this.getResourceGroup()));
 +    }
 +
 +    sld = new ServiceLockData(descriptors);
-     log.info("Setting manager lock data to {}", sld.toString());
++    log.info("Setting manager lock data to {}", sld);
      try {
        managerLock.replaceLockData(sld);
      } catch (KeeperException | InterruptedException e) {
@@@ -1182,19 -1297,6 +1182,18 @@@
      log.info("exiting");
    }
  
-   private Fate<Manager> initializeFateInstance(ServerContext context, 
FateInstanceType type,
-       FateStore<Manager> store) {
++  private Fate<Manager> initializeFateInstance(ServerContext context, 
FateStore<Manager> store) {
 +
 +    final Fate<Manager> fateInstance =
 +        new Fate<>(this, store, TraceRepo::toLogString, getConfiguration());
 +
 +    var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), 
System::nanoTime);
 +    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
 +        .scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));
 +
 +    return fateInstance;
 +  }
 +
    /**
     * Allows property configuration to block manager start-up waiting for a 
minimum number of
     * tservers to register in zookeeper. It also accepts a maximum time to 
wait - if the time

Reply via email to