This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 98cbd85f856dca0cbbc6e4fbd6a4febded2e03cb
Merge: 6636feab60 7b1182192d
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Jan 15 21:16:15 2025 +0000

    Merge branch '2.1' into 3.1

 .../accumulo/core/fate/zookeeper/ZooCache.java     | 28 +++++++++++++++
 .../MiniAccumuloClusterControl.java                |  6 ++++
 .../miniclusterImpl/MiniAccumuloClusterImpl.java   | 40 ++++++++++++++++++++++
 .../org/apache/accumulo/server/util/ZooZap.java    | 17 ++++++---
 4 files changed, 87 insertions(+), 4 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java
index e24448585e,d7f924f576..a490a03cfd
--- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java
@@@ -32,12 -30,9 +32,13 @@@ import java.util.concurrent.locks.Lock
  import java.util.concurrent.locks.LockSupport;
  import java.util.concurrent.locks.ReadWriteLock;
  import java.util.concurrent.locks.ReentrantReadWriteLock;
 +import java.util.function.Consumer;
+ import java.util.function.Predicate;
  
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
 +import org.apache.accumulo.core.lock.ServiceLockData;
 +import org.apache.accumulo.core.zookeeper.ZooSession;
  import org.apache.zookeeper.KeeperException;
  import org.apache.zookeeper.KeeperException.Code;
  import org.apache.zookeeper.WatchedEvent;
@@@ -529,21 -526,40 +530,48 @@@ public class ZooCache 
      }
    }
  
+   /**
+    * Removes all paths in the cache match the predicate.
+    */
+   public void clear(Predicate<String> pathPredicate) {
+     Preconditions.checkState(!closed);
+     Predicate<String> pathPredicateToUse;
+     if (log.isTraceEnabled()) {
+       pathPredicateToUse = pathPredicate.and(path -> {
+         log.trace("removing {} from cache", path);
+         return true;
+       });
+     } else {
+       pathPredicateToUse = pathPredicate;
+     }
+     cacheWriteLock.lock();
+     try {
+       cache.keySet().removeIf(pathPredicateToUse);
+       childrenCache.keySet().removeIf(pathPredicateToUse);
+       statCache.keySet().removeIf(pathPredicateToUse);
+ 
+       immutableCache = new ImmutableCacheCopies(++updateCount, cache, 
statCache, childrenCache);
+ 
+     } finally {
+       cacheWriteLock.unlock();
+     }
+   }
+ 
 -  public byte[] getLockData(ServiceLockPath path) {
 +  public Optional<ServiceLockData> getLockData(ServiceLockPath path) {
      List<String> children = ServiceLock.validateAndSort(path, 
getChildren(path.toString()));
      if (children == null || children.isEmpty()) {
 -      return null;
 +      return Optional.empty();
      }
      String lockNode = children.get(0);
 -    return get(path + "/" + lockNode);
 +
 +    byte[] lockData = get(path + "/" + lockNode);
 +    if (log.isTraceEnabled()) {
 +      log.trace("Data from lockNode {} is {}", lockNode, new String(lockData, 
UTF_8));
 +    }
 +    if (lockData == null) {
 +      lockData = new byte[0];
 +    }
 +    return ServiceLockData.parse(lockData);
    }
  
  }
diff --cc 
minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 7ea33e947a,95f8a4ca2b..dcfebbdd8a
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@@ -52,8 -53,10 +52,9 @@@ import java.util.concurrent.Executors
  import java.util.concurrent.FutureTask;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.TimeoutException;
 -import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicReference;
  import java.util.function.Function;
+ import java.util.function.Predicate;
  import java.util.function.Supplier;
  import java.util.stream.Stream;
  
@@@ -90,9 -91,10 +91,10 @@@ import org.apache.accumulo.server.fs.Vo
  import org.apache.accumulo.server.init.Initialize;
  import org.apache.accumulo.server.util.AccumuloStatus;
  import org.apache.accumulo.server.util.PortUtils;
+ import org.apache.accumulo.server.util.ZooZap;
  import org.apache.accumulo.start.Main;
 -import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil;
  import org.apache.accumulo.start.spi.KeywordExecutable;
 +import org.apache.accumulo.start.util.MiniDFSUtil;
  import org.apache.commons.io.IOUtils;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.CommonConfigurationKeys;
@@@ -783,9 -826,47 +785,47 @@@ public class MiniAccumuloClusterImpl im
  
      control.stop(ServerType.GARBAGE_COLLECTOR, null);
      control.stop(ServerType.MANAGER, null);
+     control.stop(ServerType.COMPACTION_COORDINATOR);
      control.stop(ServerType.TABLET_SERVER, null);
+     control.stop(ServerType.COMPACTOR, null);
+     control.stop(ServerType.SCAN_SERVER, null);
+ 
+     // The method calls above kill the server
+     // Clean up the locks in ZooKeeper fo that if the cluster
+     // is restarted, then the processes will start right away
+     // and not wait for the old locks to be cleaned up.
+     try {
+       new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager",
+           "-compaction-coordinators", "-tservers", "-compactors", 
"-sservers");
+     } catch (RuntimeException e) {
+       log.error("Error zapping zookeeper locks", e);
+     }
      control.stop(ServerType.ZOOKEEPER, null);
  
+     // Clear the location of the servers in ZooCache.
+     // When ZooKeeper was stopped in the previous method call,
+     // the local ZooKeeper watcher did not fire. If MAC is
+     // restarted, then ZooKeeper will start on the same port with
+     // the same data, but no Watchers will fire.
+     boolean startCalled = true;
+     try {
 -      getServerContext();
 -    } catch (RuntimeException e) {
++      getServerContext().getZooKeeperRoot();
++    } catch (IllegalStateException e) {
+       if (e.getMessage().startsWith("Accumulo not initialized")) {
+         startCalled = false;
+       }
+     }
+     if (startCalled) {
+       final ServerContext ctx = getServerContext();
 -      final String zRoot = getServerContext().getZooKeeperRoot();
++      final String zRoot = ctx.getZooKeeperRoot();
+       Predicate<String> pred = path -> false;
+       for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, 
Constants.ZGC_LOCK,
+           Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) {
+         pred = pred.or(path -> path.startsWith(zRoot + lockPath));
+       }
+       ctx.getZooCache().clear(pred);
+     }
+ 
      // ACCUMULO-2985 stop the ExecutorService after we finished using it to 
stop accumulo procs
      if (executor != null) {
        List<Runnable> tasksRemaining = executor.shutdownNow();
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
index ed4a403a3a,d16b9fe984..0c2d6f18b6
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@@ -96,101 -111,102 +109,97 @@@ public class ZooZap implements KeywordE
        return;
      }
  
-     var siteConf = SiteConfiguration.auto();
 -    String volDir = 
VolumeConfiguration.getVolumeUris(siteConf).iterator().next();
 -    Path instanceDir = new Path(volDir, "instance_id");
 -    InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new 
Configuration());
 -    ZooReaderWriter zoo = new ZooReaderWriter(siteConf);
 +    try (var zk = new ZooSession(getClass().getSimpleName(), siteConf)) {
 +      // Login as the server on secure HDFS
 +      if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
 +        SecurityUtil.serverLogin(siteConf);
 +      }
  
 -    if (opts.zapMaster) {
 -      log.warn("The -master option is deprecated. Please use -manager 
instead.");
 -    }
 -    if (opts.zapManager || opts.zapMaster) {
 -      String managerLockPath = Constants.ZROOT + "/" + iid + 
Constants.ZMANAGER_LOCK;
 +      String volDir = 
VolumeConfiguration.getVolumeUris(siteConf).iterator().next();
 +      Path instanceDir = new Path(volDir, "instance_id");
 +      InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new 
Configuration());
 +      var zrw = zk.asReaderWriter();
 +
 +      if (opts.zapManager) {
 +        String managerLockPath = ZooUtil.getRoot(iid) + 
Constants.ZMANAGER_LOCK;
  
 -      try {
 -        zapDirectory(zoo, managerLockPath, opts);
 -      } catch (KeeperException | InterruptedException e) {
 -        e.printStackTrace();
 +        try {
 +          zapDirectory(zrw, managerLockPath, opts);
 +        } catch (KeeperException | InterruptedException e) {
 +          e.printStackTrace();
 +        }
        }
 -    }
  
 -    if (opts.zapTservers) {
 -      String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS;
 -      try {
 -        List<String> children = zoo.getChildren(tserversPath);
 -        for (String child : children) {
 -          message("Deleting " + tserversPath + "/" + child + " from 
zookeeper", opts);
 -
 -          if (opts.zapManager || opts.zapMaster) {
 -            zoo.recursiveDelete(tserversPath + "/" + child, 
NodeMissingPolicy.SKIP);
 -          } else {
 -            var zLockPath = ServiceLock.path(tserversPath + "/" + child);
 -            if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
 -              if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) {
 -                message("Did not delete " + tserversPath + "/" + child, opts);
 +      if (opts.zapTservers) {
 +        String tserversPath = ZooUtil.getRoot(iid) + Constants.ZTSERVERS;
 +        try {
 +          List<String> children = zrw.getChildren(tserversPath);
 +          for (String child : children) {
 +            message("Deleting " + tserversPath + "/" + child + " from 
zookeeper", opts);
 +
 +            if (opts.zapManager) {
 +              zrw.recursiveDelete(tserversPath + "/" + child, 
NodeMissingPolicy.SKIP);
 +            } else {
 +              var zLockPath = ServiceLock.path(tserversPath + "/" + child);
 +              if (!zrw.getChildren(zLockPath.toString()).isEmpty()) {
 +                if (!ServiceLock.deleteLock(zrw, zLockPath, "tserver")) {
 +                  message("Did not delete " + tserversPath + "/" + child, 
opts);
 +                }
                }
              }
            }
 +        } catch (KeeperException | InterruptedException e) {
 +          log.error("{}", e.getMessage(), e);
          }
 -      } catch (KeeperException | InterruptedException e) {
 -        log.error("{}", e.getMessage(), e);
        }
 -    }
 -
 -    // Remove the tracers, we don't use them anymore.
 -    @SuppressWarnings("deprecation")
 -    String path = siteConf.get(Property.TRACE_ZK_PATH);
 -    try {
 -      zapDirectory(zoo, path, opts);
 -    } catch (Exception e) {
 -      // do nothing if the /tracers node does not exist.
 -    }
  
 -    if (opts.zapCoordinators) {
 -      final String coordinatorPath = Constants.ZROOT + "/" + iid + 
Constants.ZCOORDINATOR_LOCK;
 -      try {
 -        if (zoo.exists(coordinatorPath)) {
 -          zapDirectory(zoo, coordinatorPath, opts);
 +      if (opts.zapCoordinators) {
 +        final String coordinatorPath = ZooUtil.getRoot(iid) + 
Constants.ZCOORDINATOR_LOCK;
 +        try {
 +          if (zrw.exists(coordinatorPath)) {
 +            zapDirectory(zrw, coordinatorPath, opts);
 +          }
 +        } catch (KeeperException | InterruptedException e) {
 +          log.error("Error deleting coordinator from zookeeper, {}", 
e.getMessage(), e);
          }
 -      } catch (KeeperException | InterruptedException e) {
 -        log.error("Error deleting coordinator from zookeeper, {}", 
e.getMessage(), e);
        }
 -    }
  
 -    if (opts.zapCompactors) {
 -      String compactorsBasepath = Constants.ZROOT + "/" + iid + 
Constants.ZCOMPACTORS;
 -      try {
 -        if (zoo.exists(compactorsBasepath)) {
 -          List<String> queues = zoo.getChildren(compactorsBasepath);
 -          for (String queue : queues) {
 -            message("Deleting " + compactorsBasepath + "/" + queue + " from 
zookeeper", opts);
 -            zoo.recursiveDelete(compactorsBasepath + "/" + queue, 
NodeMissingPolicy.SKIP);
 +      if (opts.zapCompactors) {
 +        String compactorsBasepath = ZooUtil.getRoot(iid) + 
Constants.ZCOMPACTORS;
 +        try {
 +          if (zrw.exists(compactorsBasepath)) {
 +            List<String> queues = zrw.getChildren(compactorsBasepath);
 +            for (String queue : queues) {
 +              message("Deleting " + compactorsBasepath + "/" + queue + " from 
zookeeper", opts);
 +              zrw.recursiveDelete(compactorsBasepath + "/" + queue, 
NodeMissingPolicy.SKIP);
 +            }
            }
 +        } catch (KeeperException | InterruptedException e) {
 +          log.error("Error deleting compactors from zookeeper, {}", 
e.getMessage(), e);
          }
 -      } catch (KeeperException | InterruptedException e) {
 -        log.error("Error deleting compactors from zookeeper, {}", 
e.getMessage(), e);
 -      }
 -
 -    }
  
 -    if (opts.zapScanServers) {
 -      String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS;
 -      try {
 -        if (zoo.exists(sserversPath)) {
 -          List<String> children = zoo.getChildren(sserversPath);
 -          for (String child : children) {
 -            message("Deleting " + sserversPath + "/" + child + " from 
zookeeper", opts);
 +      }
  
 -            var zLockPath = ServiceLock.path(sserversPath + "/" + child);
 -            if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
 -              ServiceLock.deleteLock(zoo, zLockPath);
 +      if (opts.zapScanServers) {
 +        String sserversPath = ZooUtil.getRoot(iid) + Constants.ZSSERVERS;
 +        try {
 +          if (zrw.exists(sserversPath)) {
 +            List<String> children = zrw.getChildren(sserversPath);
 +            for (String child : children) {
 +              message("Deleting " + sserversPath + "/" + child + " from 
zookeeper", opts);
 +
 +              var zLockPath = ServiceLock.path(sserversPath + "/" + child);
 +              if (!zrw.getChildren(zLockPath.toString()).isEmpty()) {
 +                ServiceLock.deleteLock(zrw, zLockPath);
 +              }
              }
            }
 +        } catch (KeeperException | InterruptedException e) {
 +          log.error("{}", e.getMessage(), e);
          }
 -      } catch (KeeperException | InterruptedException e) {
 -        log.error("{}", e.getMessage(), e);
        }
 -    }
  
-     } finally {
-       SingletonManager.setMode(Mode.CLOSED);
 +    }
- 
    }
  
    private static void zapDirectory(ZooReaderWriter zoo, String path, Opts 
opts)

Reply via email to