This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 98cbd85f856dca0cbbc6e4fbd6a4febded2e03cb Merge: 6636feab60 7b1182192d Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Jan 15 21:16:15 2025 +0000 Merge branch '2.1' into 3.1 .../accumulo/core/fate/zookeeper/ZooCache.java | 28 +++++++++++++++ .../MiniAccumuloClusterControl.java | 6 ++++ .../miniclusterImpl/MiniAccumuloClusterImpl.java | 40 ++++++++++++++++++++++ .../org/apache/accumulo/server/util/ZooZap.java | 17 ++++++--- 4 files changed, 87 insertions(+), 4 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index e24448585e,d7f924f576..a490a03cfd --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@@ -32,12 -30,9 +32,13 @@@ import java.util.concurrent.locks.Lock import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + import java.util.function.Predicate; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; @@@ -529,21 -526,40 +530,48 @@@ public class ZooCache } } + /** + * Removes all paths in the cache match the predicate. + */ + public void clear(Predicate<String> pathPredicate) { + Preconditions.checkState(!closed); + Predicate<String> pathPredicateToUse; + if (log.isTraceEnabled()) { + pathPredicateToUse = pathPredicate.and(path -> { + log.trace("removing {} from cache", path); + return true; + }); + } else { + pathPredicateToUse = pathPredicate; + } + cacheWriteLock.lock(); + try { + cache.keySet().removeIf(pathPredicateToUse); + childrenCache.keySet().removeIf(pathPredicateToUse); + statCache.keySet().removeIf(pathPredicateToUse); + + immutableCache = new ImmutableCacheCopies(++updateCount, cache, statCache, childrenCache); + + } finally { + cacheWriteLock.unlock(); + } + } + - public byte[] getLockData(ServiceLockPath path) { + public Optional<ServiceLockData> getLockData(ServiceLockPath path) { List<String> children = ServiceLock.validateAndSort(path, getChildren(path.toString())); if (children == null || children.isEmpty()) { - return null; + return Optional.empty(); } String lockNode = children.get(0); - return get(path + "/" + lockNode); + + byte[] lockData = get(path + "/" + lockNode); + if (log.isTraceEnabled()) { + log.trace("Data from lockNode {} is {}", lockNode, new String(lockData, UTF_8)); + } + if (lockData == null) { + lockData = new byte[0]; + } + return ServiceLockData.parse(lockData); } } diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 7ea33e947a,95f8a4ca2b..dcfebbdd8a --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@@ -52,8 -53,10 +52,9 @@@ import java.util.concurrent.Executors import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; + import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; @@@ -90,9 -91,10 +91,10 @@@ import org.apache.accumulo.server.fs.Vo import org.apache.accumulo.server.init.Initialize; import org.apache.accumulo.server.util.AccumuloStatus; import org.apache.accumulo.server.util.PortUtils; + import org.apache.accumulo.server.util.ZooZap; import org.apache.accumulo.start.Main; -import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil; import org.apache.accumulo.start.spi.KeywordExecutable; +import org.apache.accumulo.start.util.MiniDFSUtil; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@@ -783,9 -826,47 +785,47 @@@ public class MiniAccumuloClusterImpl im control.stop(ServerType.GARBAGE_COLLECTOR, null); control.stop(ServerType.MANAGER, null); + control.stop(ServerType.COMPACTION_COORDINATOR); control.stop(ServerType.TABLET_SERVER, null); + control.stop(ServerType.COMPACTOR, null); + control.stop(ServerType.SCAN_SERVER, null); + + // The method calls above kill the server + // Clean up the locks in ZooKeeper fo that if the cluster + // is restarted, then the processes will start right away + // and not wait for the old locks to be cleaned up. + try { + new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager", + "-compaction-coordinators", "-tservers", "-compactors", "-sservers"); + } catch (RuntimeException e) { + log.error("Error zapping zookeeper locks", e); + } control.stop(ServerType.ZOOKEEPER, null); + // Clear the location of the servers in ZooCache. + // When ZooKeeper was stopped in the previous method call, + // the local ZooKeeper watcher did not fire. If MAC is + // restarted, then ZooKeeper will start on the same port with + // the same data, but no Watchers will fire. + boolean startCalled = true; + try { - getServerContext(); - } catch (RuntimeException e) { ++ getServerContext().getZooKeeperRoot(); ++ } catch (IllegalStateException e) { + if (e.getMessage().startsWith("Accumulo not initialized")) { + startCalled = false; + } + } + if (startCalled) { + final ServerContext ctx = getServerContext(); - final String zRoot = getServerContext().getZooKeeperRoot(); ++ final String zRoot = ctx.getZooKeeperRoot(); + Predicate<String> pred = path -> false; + for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK, + Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) { + pred = pred.or(path -> path.startsWith(zRoot + lockPath)); + } + ctx.getZooCache().clear(pred); + } + // ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs if (executor != null) { List<Runnable> tasksRemaining = executor.shutdownNow(); diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index ed4a403a3a,d16b9fe984..0c2d6f18b6 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@@ -96,101 -111,102 +109,97 @@@ public class ZooZap implements KeywordE return; } - var siteConf = SiteConfiguration.auto(); - String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); - Path instanceDir = new Path(volDir, "instance_id"); - InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); - ZooReaderWriter zoo = new ZooReaderWriter(siteConf); + try (var zk = new ZooSession(getClass().getSimpleName(), siteConf)) { + // Login as the server on secure HDFS + if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + SecurityUtil.serverLogin(siteConf); + } - if (opts.zapMaster) { - log.warn("The -master option is deprecated. Please use -manager instead."); - } - if (opts.zapManager || opts.zapMaster) { - String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; + String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); + Path instanceDir = new Path(volDir, "instance_id"); + InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); + var zrw = zk.asReaderWriter(); + + if (opts.zapManager) { + String managerLockPath = ZooUtil.getRoot(iid) + Constants.ZMANAGER_LOCK; - try { - zapDirectory(zoo, managerLockPath, opts); - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); + try { + zapDirectory(zrw, managerLockPath, opts); + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); + } } - } - if (opts.zapTservers) { - String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; - try { - List<String> children = zoo.getChildren(tserversPath); - for (String child : children) { - message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts); - - if (opts.zapManager || opts.zapMaster) { - zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP); - } else { - var zLockPath = ServiceLock.path(tserversPath + "/" + child); - if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { - if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) { - message("Did not delete " + tserversPath + "/" + child, opts); + if (opts.zapTservers) { + String tserversPath = ZooUtil.getRoot(iid) + Constants.ZTSERVERS; + try { + List<String> children = zrw.getChildren(tserversPath); + for (String child : children) { + message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts); + + if (opts.zapManager) { + zrw.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP); + } else { + var zLockPath = ServiceLock.path(tserversPath + "/" + child); + if (!zrw.getChildren(zLockPath.toString()).isEmpty()) { + if (!ServiceLock.deleteLock(zrw, zLockPath, "tserver")) { + message("Did not delete " + tserversPath + "/" + child, opts); + } } } } + } catch (KeeperException | InterruptedException e) { + log.error("{}", e.getMessage(), e); } - } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); } - } - - // Remove the tracers, we don't use them anymore. - @SuppressWarnings("deprecation") - String path = siteConf.get(Property.TRACE_ZK_PATH); - try { - zapDirectory(zoo, path, opts); - } catch (Exception e) { - // do nothing if the /tracers node does not exist. - } - if (opts.zapCoordinators) { - final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK; - try { - if (zoo.exists(coordinatorPath)) { - zapDirectory(zoo, coordinatorPath, opts); + if (opts.zapCoordinators) { + final String coordinatorPath = ZooUtil.getRoot(iid) + Constants.ZCOORDINATOR_LOCK; + try { + if (zrw.exists(coordinatorPath)) { + zapDirectory(zrw, coordinatorPath, opts); + } + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e); } - } catch (KeeperException | InterruptedException e) { - log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e); } - } - if (opts.zapCompactors) { - String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS; - try { - if (zoo.exists(compactorsBasepath)) { - List<String> queues = zoo.getChildren(compactorsBasepath); - for (String queue : queues) { - message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts); - zoo.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP); + if (opts.zapCompactors) { + String compactorsBasepath = ZooUtil.getRoot(iid) + Constants.ZCOMPACTORS; + try { + if (zrw.exists(compactorsBasepath)) { + List<String> queues = zrw.getChildren(compactorsBasepath); + for (String queue : queues) { + message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts); + zrw.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP); + } } + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e); } - } catch (KeeperException | InterruptedException e) { - log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e); - } - - } - if (opts.zapScanServers) { - String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS; - try { - if (zoo.exists(sserversPath)) { - List<String> children = zoo.getChildren(sserversPath); - for (String child : children) { - message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts); + } - var zLockPath = ServiceLock.path(sserversPath + "/" + child); - if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { - ServiceLock.deleteLock(zoo, zLockPath); + if (opts.zapScanServers) { + String sserversPath = ZooUtil.getRoot(iid) + Constants.ZSSERVERS; + try { + if (zrw.exists(sserversPath)) { + List<String> children = zrw.getChildren(sserversPath); + for (String child : children) { + message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts); + + var zLockPath = ServiceLock.path(sserversPath + "/" + child); + if (!zrw.getChildren(zLockPath.toString()).isEmpty()) { + ServiceLock.deleteLock(zrw, zLockPath); + } } } + } catch (KeeperException | InterruptedException e) { + log.error("{}", e.getMessage(), e); } - } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); } - } - } finally { - SingletonManager.setMode(Mode.CLOSED); + } - } private static void zapDirectory(ZooReaderWriter zoo, String path, Opts opts)