This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 77078f321fb492c0d80499973df04b386282e06f Merge: a516685f59 48953e404d Author: Keith Turner <[email protected]> AuthorDate: Fri May 16 16:06:53 2025 +0000 Merge branch '2.1' assemble/bin/accumulo-cluster | 193 ++++++++++++++++++++- .../miniclusterImpl/MiniAccumuloClusterImpl.java | 3 +- .../apache/accumulo/server/util/UpgradeUtil.java | 3 +- .../org/apache/accumulo/server/util/ZooZap.java | 101 +++++++++-- 4 files changed, 277 insertions(+), 23 deletions(-) diff --cc assemble/bin/accumulo-cluster index a2d18a5a8f,0d747015ed..06e63a62d2 --- a/assemble/bin/accumulo-cluster +++ b/assemble/bin/accumulo-cluster @@@ -52,7 -54,17 +52,8 @@@ $(cyan Commands) $(green stop) Stops Accumulo cluster services $(green restart) Restarts Accumulo cluster services $(green kill) Kills Accumulo cluster services + $(green prune) Reomves zookeeper locks of extra processes - $(cyan Deprecated commands): - $(green start-non-tservers) $(yellow Deprecated). Alias for "start --no-tservers" - $(green start-servers) $(yellow Deprecated). Alias for "start" - $(green stop-servers) $(yellow Deprecated). Alias for "stop" - $(green start-tservers) $(yellow Deprecated). Alias for "start --tservers" - $(green stop-tservers) $(yellow Deprecated). Alias for "stop --tservers" - $(green start-here) $(yellow Deprecated). Alias for "start --local" - $(green stop-here) $(yellow Deprecated). Alias for "stop --local" - $(cyan Examples): $(purple 'accumulo-cluster start') $(blue '# start all servers') $(purple 'accumulo-cluster start --dry-run') $(blue '# print debug information and commands to be executed') @@@ -589,10 -653,177 +598,184 @@@ function control_services() fi if [[ $ARG_LOCAL == 0 && $ARG_ALL == 1 && ($operation == "stop" || $operation == "kill") ]]; then - if ! isDebug; then - echo "Cleaning all server entries in ZooKeeper" - "$accumulo_cmd" org.apache.accumulo.server.util.ZooZap -manager -tservers -compaction-coordinators -compactors -sservers --gc --monitor - fi + debug "Cleaning all server entries in ZooKeeper" - debugOrRun "$accumulo_cmd" org.apache.accumulo.server.util.ZooZap -verbose -manager -tservers -compactors -sservers ++ debugOrRun "$accumulo_cmd" org.apache.accumulo.server.util.ZooZap -verbose -manager -tservers -compactors -sservers --gc --monitor + fi + + } + + function prune_group() { + local service_type=$1 + local group=$2 + local expectedCount=$3 + declare -a hosts + read -r -a hosts <<<"$4" + + if isDebug; then + echo "$(blue DEBUG) starting prune for service:$service_type group:$group expected:$expectedCount" + fi + + if [ -z ${AC_TMP_DIR+x} ]; then + echo "$(red ERROR): AC_TMP_DIR is not set" + exit 1 + fi + local exclude_file="$AC_TMP_DIR/accumulo-zoozap-exclude-$service_type-$group.txt" + touch "$exclude_file" + + # Determine the host:ports known by the accumulo cluster script, these should be kept + for host in "${hosts[@]}"; do + "${SSH[@]}" "$host" bash -c "'$bin/accumulo-service $service_type list'" | grep -E "^[a-zA-Z0-9]+_${group}_[0-9]+" | head -n "$expectedCount" | awk '{print $3}' | tr ',' '\n' | awk '{print "'"$host"':" $1}' >>"$exclude_file" + done + + local lockTypeOpt + case $service_type in + manager) + lockTypeOpt="-manager" + ;; + compaction-coordinator) + lockTypeOpt="-compaction-coordinators" + ;; + compactor) + lockTypeOpt="-compactors" + ;; + tserver) + lockTypeOpt="-tservers" + ;; + sserver) + lockTypeOpt="-sservers" + ;; + gc) + lockTypeOpt="--gc" + ;; + monitor) + lockTypeOpt="--monitor" + ;; + *) + echo "Prune does not support $service_type" + exit 1 + ;; + esac + + if isDebug; then + "$accumulo_cmd" org.apache.accumulo.server.util.ZooZap "$lockTypeOpt" -verbose --include-groups "$group" --exclude-host-ports "$exclude_file" --dry-run + else + "$accumulo_cmd" org.apache.accumulo.server.util.ZooZap "$lockTypeOpt" -verbose --include-groups "$group" --exclude-host-ports "$exclude_file" + fi + } + + # Kills extra server processes that are not needed according to the + # cluster.yaml file. Conceptually this code is trying to reconcile the + # following three sets of servers. + # + # 1. The notional goal set of servers specified by cluster.yaml + # 2. The set of servers processes seen in zookeeper + # 3. The set of server processes known to the accumulo-cluster script. This + # is derived from pid files on hosts in set 1. + # + # This function attempts to find extra servers in set 2 that are not specified + # by set 1. When it does find extra servers it removes their zookeeper locks + # avoiding removing locks of servers in set 3. The following are different + # situations the code will see and handle. + # + # * When a host is not cluster.yaml but has some processes listed in + # zookeeper. For this case all of the process with that host can be killed. + # * When a resource group is not in cluster.yaml but has some processes listed + # in zookeeper. For this case all of the processes with that resource group + # can be killed. + # * When a host is in cluster.yaml with a target of 3 processes but has 6 + # processes listed in zookeeper. For this case want to kill 3 processes that + # do not have pid files on the host. + # + function prune() { + if [[ $ARG_LOCAL == 1 ]]; then + # Currently the code is structured to remove all extra servers in a single resource group. Finer granularity is not supported. + echo "$(red ERROR): Prune does not support running locally" + exit 1 + fi + + if ! jq -h >&/dev/null; then + echo "$(red ERROR:) Missing $(green jq). Unable to continue." + exit 1 + fi + + if [[ -z ${AC_TMP_DIR+x} ]]; then + echo "AC_TMP_DIR is not set" + exit 1 + fi + local service_json="$AC_TMP_DIR/accumulo-service.json" + "$accumulo_cmd" admin serviceStatus --json >"$service_json" 2>/dev/null || exit 1 + + local var_name + local hosts + declare -a groups + + local manager + if [[ $ARG_ALL == 1 || $ARG_MANAGER == 1 ]]; then + prune_group "manager" "default" "1" "$MANAGER_HOSTS" + fi + + if [[ $ARG_ALL == 1 || $ARG_GC == 1 ]]; then + prune_group "gc" "default" "1" "$GC_HOSTS" fi + if [[ $ARG_ALL == 1 || $ARG_MONITOR == 1 ]]; then + prune_group "monitor" "default" "1" "$MONITOR_HOSTS" + fi + - if [[ $ARG_ALL == 1 || $ARG_COORDINATOR == 1 ]]; then - prune_group "compaction-coordinator" "default" "1" "$COORDINATOR_HOSTS" - fi - + if [[ $ARG_ALL == 1 || $ARG_TSERVER == 1 ]]; then - #TODO in main need to adapt to having RGs for tservers - prune_group "tserver" "default" "$TSERVERS_PER_HOST_default" "$TSERVER_HOSTS_default" ++ groups=() ++ if [[ -n $ARG_TSERVER_GROUP ]]; then ++ read -r -a groups <<<"$ARG_TSERVER_GROUP" ++ else ++ # find all groups known in zookeeper, this will allow pruning entire groups that do not even exist in cluster.yaml ++ readarray -t groups < <(jq -r ".summaries.T_SERVER.resourceGroups | .[] " "$service_json") ++ fi ++ ++ for group in "${groups[@]}"; do ++ var_name="TSERVERS_PER_HOST_$group" ++ local expected=${!var_name:-0} ++ ++ hosts="TSERVER_HOSTS_$group" ++ prune_group "tserver" "$group" "$expected" "${!hosts}" ++ done + fi + + if [[ $ARG_ALL == 1 || $ARG_SSERVER == 1 ]]; then + groups=() + if [[ -n $ARG_SSERVER_GROUP ]]; then + read -r -a groups <<<"$ARG_SSERVER_GROUP" + else + # find all groups known in zookeeper, this will allow pruning entire groups that do not even exist in cluster.yaml + readarray -t groups < <(jq -r ".summaries.S_SERVER.resourceGroups | .[] " "$service_json") + fi + + for group in "${groups[@]}"; do + var_name="SSERVERS_PER_HOST_$group" + local expected=${!var_name:-0} + + hosts="SSERVER_HOSTS_$group" + prune_group "sserver" "$group" "$expected" "${!hosts}" + done + + fi + + if [[ $ARG_ALL == 1 || $ARG_COMPACTOR == 1 ]]; then + groups=() + if [[ -n $ARG_COMPACTOR_GROUP ]]; then + read -r -a groups <<<"$ARG_COMPACTOR_GROUP" + else + # find all groups known in zookeeper, this will allow pruning entire groups that do not even exist in cluster.yaml + readarray -t groups < <(jq -r ".summaries.COMPACTOR.resourceGroups | .[] " "$service_json") + fi + + for group in "${groups[@]}"; do + var_name="COMPACTORS_PER_HOST_$group" + local expected=${!var_name:-0} + + hosts="COMPACTOR_HOSTS_$group" + prune_group "compactor" "$group" "$expected" "${!hosts}" + done + fi } function main() { @@@ -674,6 -919,54 +857,10 @@@ EO parse_config control_services kill ;; - start-here) - parse_config - ARG_ALL=1 - ARG_LOCAL=1 - control_services start - ;; - stop-here) - parse_config - ARG_ALL=1 - ARG_LOCAL=1 - control_services stop - control_services kill - ;; + prune) + parse_config + prune + ;; - start-non-tservers) - echo "'$ARG_CMD' is deprecated. Please specify the servers you wish to start instead" - parse_config - ARG_MANAGER=1 - ARG_GC=1 - ARG_MONITOR=1 - ARG_COORDINATOR=1 - ARG_SSERVER=1 - ARG_COMPACTOR=1 - control_services start - ;; - start-servers) - echo "'$ARG_CMD' is deprecated. Please use 'start' instead" - parse_config - control_services start - ;; - stop-servers) - echo "'$ARG_CMD' is deprecated. Please use 'stop' instead" - parse_config - control_services stop - ;; - start-tservers) - echo "'$ARG_CMD' is deprecated. Please use 'start --tservers' instead" - ARG_TSERVER=1 - control_services start - ;; - stop-tservers) - echo "'$ARG_CMD' is deprecated. Please use 'stop --tservers' instead" - ARG_TSERVER=1 - control_services stop - ;; *) invalid_args "'$ARG_CMD' is an invalid <command>" ;; diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index c2dc708cdc,0f3b8bae07..471c699400 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@@ -984,27 -837,36 +984,28 @@@ public class MiniAccumuloClusterImpl im // is restarted, then the processes will start right away // and not wait for the old locks to be cleaned up. try { - new ZooZap().zap(getServerContext(), "-manager", "-tservers", "-compactors", "-sservers"); - new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager", - "-compaction-coordinators", "-tservers", "-compactors", "-sservers", "--gc"); ++ new ZooZap().zap(getServerContext(), "-manager", "-tservers", "-compactors", "-sservers", ++ "--gc"); } catch (RuntimeException e) { - log.error("Error zapping zookeeper locks", e); + if (!e.getMessage().startsWith("Accumulo not initialized")) { + log.error("Error zapping zookeeper locks", e); + } } - control.stop(ServerType.ZOOKEEPER, null); // Clear the location of the servers in ZooCache. - // When ZooKeeper was stopped in the previous method call, - // the local ZooKeeper watcher did not fire. If MAC is - // restarted, then ZooKeeper will start on the same port with - // the same data, but no Watchers will fire. - boolean startCalled = true; + boolean macStarted = false; try { - getServerContext(); - } catch (RuntimeException e) { - if (e.getMessage().startsWith("Accumulo not initialized")) { - startCalled = false; + ZooUtil.getRoot(getServerContext().getInstanceID()); + macStarted = true; + } catch (IllegalStateException e) { + if (!e.getMessage().startsWith("Accumulo not initialized")) { + throw e; } } - if (startCalled) { - final ServerContext ctx = getServerContext(); - final String zRoot = getServerContext().getZooKeeperRoot(); - Predicate<String> pred = path -> false; - for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK, - Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) { - pred = pred.or(path -> path.startsWith(zRoot + lockPath)); - } - ctx.getZooCache().clear(pred); + if (macStarted) { + getServerContext().getZooCache().clear(path -> path.startsWith("/")); } + control.stop(ServerType.ZOOKEEPER, null); // ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs if (executor != null) { diff --cc server/base/src/main/java/org/apache/accumulo/server/util/UpgradeUtil.java index 73baf9f3d7,261377a742..86abc4a42c --- a/server/base/src/main/java/org/apache/accumulo/server/util/UpgradeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/UpgradeUtil.java @@@ -79,150 -60,63 +79,151 @@@ public class UpgradeUtil implements Key @Override public String description() { - return "utility used to perform various upgrade steps for an Accumulo instance"; + return "utility used to perform various upgrade steps for an Accumulo instance. The 'prepare'" + + " step is intended to be run using the old version of software after an instance has" + + " been shut down. The 'start' step is intended to be run on the instance with the new" + + " version of software. Server processes should fail to start after the 'prepare' step" + + " has been run due to the existence of a node in ZooKeeper. When the 'start' step" + + " completes successfully it will remove this node allowing the user to start the" + + " Manager to complete the instance upgrade process."; } - @Override - public void execute(String[] args) throws Exception { - Opts opts = new Opts(); - opts.parseArgs(keyword(), args); + private void prepare(final ServerContext context) { - if (!opts.prepare) { - new JCommander(opts).usage(); - return; + final int persistentVersion = AccumuloDataVersion.getCurrentVersion(context); + final int thisVersion = AccumuloDataVersion.get(); + if (persistentVersion != thisVersion) { + throw new IllegalStateException("It looks like you are running 'prepare' with " + + "a different version of software than what the instance was running with." + + " The 'prepare' command is intended to be run after an instance is shutdown" + + " with the same version of software before trying to upgrade."); } - var siteConf = SiteConfiguration.auto(); - // Login as the server on secure HDFS - if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { - SecurityUtil.serverLogin(siteConf); + final ZooSession zs = context.getZooSession(); + final ZooReaderWriter zoo = zs.asReaderWriter(); + + try { + zoo.delete(ZPREPARE_FOR_UPGRADE); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException( + "Error deleting " + ZPREPARE_FOR_UPGRADE + " node in zookeeper", e); } - String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); - Path instanceDir = new Path(volDir, "instance_id"); - InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); - ZooReaderWriter zoo = new ZooReaderWriter(siteConf); + LOG.info("Upgrade specified, validating that Manager is stopped"); + if (context.getServerPaths().getManager(true) != null) { + throw new IllegalStateException("Manager is running, shut it down and retry this operation"); + } - if (opts.prepare) { - final String zUpgradepath = Constants.ZROOT + "/" + iid + Constants.ZPREPARE_FOR_UPGRADE; - try { - if (zoo.exists(zUpgradepath)) { - zoo.delete(zUpgradepath); - } - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Error creating or checking for " + zUpgradepath - + " node in zookeeper: " + e.getMessage(), e); + LOG.info("Checking for existing fate transactions"); + try { + // Adapted from UpgradeCoordinator.abortIfFateTransactions + // TODO: After the 4.0.0 release this code block needs to be + // modified to account for the new Fate table. + if (!zoo.getChildren(ZFATE).isEmpty()) { + throw new IllegalStateException("Cannot complete upgrade preparation" + + " because FATE transactions exist. You can start a tserver, but" + + " not the Manager, then use the shell to delete completed" + + " transactions and fail pending or in-progress transactions." + + " Once all of the FATE transactions have been removed you can" + + " retry this operation."); } + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error checking for existing FATE transactions", e); + } - LOG.info("Upgrade specified, validating that Manager is stopped"); - final ServiceLockPath mgrPath = - ServiceLock.path(Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK); - try { - if (ServiceLock.getLockData(zoo.getZooKeeper(), mgrPath) != null) { + LOG.info("Creating {} node in zookeeper, servers will be prevented from" + + " starting while this node exists", ZPREPARE_FOR_UPGRADE); + try { + zoo.putPersistentData(ZPREPARE_FOR_UPGRADE, new byte[0], NodeExistsPolicy.SKIP); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error creating " + ZPREPARE_FOR_UPGRADE + + " node in zookeeper. Check for any issues and retry.", e); + } + + LOG.info("Forcing removal of all server locks"); - new ZooZap().zap(context, "-manager", "-tservers", "-compactors", "-sservers"); ++ new ZooZap().zap(context, "-manager", "-tservers", "-compactors", "-sservers", "--monitor", ++ "--gc"); + + LOG.info( + "Instance {} prepared for upgrade. Server processes will not start while" + + " in this state. To undo this state and abort upgrade preparations delete" + + " the zookeeper node: {}. If you abort and restart the instance, then you " + + " should re-run this utility before upgrading.", + context.getInstanceID(), ZPREPARE_FOR_UPGRADE); + } + + private void start(ServerContext context, boolean force) { + final int persistentVersion = AccumuloDataVersion.getCurrentVersion(context); + final int thisVersion = AccumuloDataVersion.get(); + if (persistentVersion == thisVersion) { + throw new IllegalStateException("Running this utility is unnecessary, this instance" + + " has already been upgraded to version " + thisVersion); + } + + if (context.getServerPaths().getManager(true) != null) { + throw new IllegalStateException("Cannot run this command with the Manager running."); + } + + final ZooSession zs = context.getZooSession(); + final ZooReader zr = zs.asReader(); + + // Check to see if the 'start' command has successfully run before. If it has, + // and the Manager made any progress in upgrading, then fail. + try { + if (zr.exists(Constants.ZUPGRADE_PROGRESS)) { + int persistedVersion = AccumuloDataVersion.getCurrentVersion(context); + byte[] bytes = zr.getData(Constants.ZUPGRADE_PROGRESS, new Stat()); + UpgradeProgress progress = UpgradeProgress.fromJsonBytes(bytes); + if (progress.getZooKeeperVersion() != persistedVersion + || progress.getRootVersion() != persistedVersion + || progress.getMetadataVersion() != persistedVersion) { throw new IllegalStateException( - "Manager is running, shut it down and retry this operation"); + "It appears that an upgrade is in progress. 'accumulo upgrade --start'" + + " cannot be run again"); + } else { + ZooUtil.recursiveDelete(zs, Constants.ZUPGRADE_PROGRESS, NodeMissingPolicy.FAIL); } - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Error trying to determine if Manager lock is held", e); + } + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(Constants.ZUPGRADE_PROGRESS + " node exists" + + " in ZooKeeper implying the 'start' command is being re-run. Deleting" + + " this node has failed. Delete it manually before retrying.", e); + } + + final String prepUpgradePath = Constants.ZPREPARE_FOR_UPGRADE; + boolean prepareNodeExists = false; + try { + prepareNodeExists = zr.exists(prepUpgradePath); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error checking for existence of node: " + prepUpgradePath, + e); + } + + if (!prepareNodeExists) { + + if (force) { + LOG.info("{} node not found in ZooKeeper, 'accumulo upgrade --prepare' was likely" + + " not run after shutting down instance for upgrade. Removing" + + " server locks and checking for fate transactions.", prepUpgradePath); + } else { + throw new IllegalStateException(prepUpgradePath + " node not found in ZooKeeper indicating" + + " that 'accumulo upgrade --prepare' was not run after shutting down the instance. If" + + " you wish to continue, then run this command using the --force option. If you wish" + + " to cancel, delete, or let your Fate transactions complete, then restart the instance" + + " with the old version of software."); } - LOG.info("Checking for existing fate transactions"); try { // Adapted from UpgradeCoordinator.abortIfFateTransactions - if (!zoo.getChildren(Constants.ZROOT + "/" + iid + Constants.ZFATE).isEmpty()) { - throw new IllegalStateException("Cannot complete upgrade preparation" + // TODO: After the 4.0.0 release this code block needs to be + // modified to account for the new Fate table. + if (!zr.getChildren(Constants.ZFATE).isEmpty()) { + throw new IllegalStateException("Cannot continue pre-upgrade checks" + " because FATE transactions exist. You can start a tserver, but" - + " not the Manager, then use the shell to delete completed" - + " transactions and fail pending or in-progress transactions." - + " Once all of the FATE transactions have been removed you can" - + " retry this operation."); + + " not the Manager, with the old version of Accumulo then use " + + " the shell to delete completed transactions and fail pending" + + " or in-progress transactions. Once all of the FATE transactions" + + " have been removed you can retry this operation."); } } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Error checking for existing FATE transactions", e); diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index 16e13a84b3,b9484176c6..9b0c40d848 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@@ -18,10 -18,17 +18,16 @@@ */ package org.apache.accumulo.server.util; -import static java.nio.charset.StandardCharsets.UTF_8; - + import java.io.IOException; + import java.io.UncheckedIOException; + import java.nio.file.Files; + import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; + import java.util.function.Predicate; + import java.util.stream.Collectors; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.Help; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; @@@ -40,6 -52,6 +46,7 @@@ import org.slf4j.LoggerFactory import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.google.auto.service.AutoService; ++import com.google.common.net.HostAndPort; @AutoService(KeywordExecutable.class) public class ZooZap implements KeywordExecutable { @@@ -66,9 -82,9 +73,6 @@@ boolean zapManager = false; @Parameter(names = "-tservers", description = "remove tablet server locks") boolean zapTservers = false; - @Parameter(names = "-group", description = "limit the zap to a specific resource group", - arity = 1) - String resourceGroup = ""; - @Parameter(names = "-compaction-coordinators", - description = "remove compaction coordinator locks") - boolean zapCoordinators = false; @Parameter(names = "-compactors", description = "remove compactor locks") boolean zapCompactors = false; @Parameter(names = "-sservers", description = "remove scan server locks") @@@ -97,85 -128,179 +114,133 @@@ Opts opts = new Opts(); opts.parseArgs(keyword(), args); - if (!opts.zapManager && !opts.zapTservers && !opts.zapCompactors && !opts.zapScanServers) { + final Predicate<String> groupPredicate; - final Predicate<HostAndPort> hostPortPredicate; ++ final AddressSelector addressSelector; + + if (opts.hostPortExcludeFile != null) { + try { + var hostPorts = Files.lines(java.nio.file.Path.of(opts.hostPortExcludeFile)) + .map(String::trim).map(HostAndPort::fromString).collect(Collectors.toSet()); - hostPortPredicate = hp -> !hostPorts.contains(hp); ++ addressSelector = ++ AddressSelector.matching(hp -> !hostPorts.contains(HostAndPort.fromString(hp))); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else { - hostPortPredicate = hp -> true; ++ addressSelector = AddressSelector.all(); + } + ++ final ResourceGroupPredicate rgp; ++ + if (opts.includeGroups != null) { + var groups = Arrays.stream(opts.includeGroups.split(",")).map(String::trim) + .collect(Collectors.toSet()); - groupPredicate = groups::contains; ++ rgp = groups::contains; + } else { - groupPredicate = g -> true; ++ rgp = g -> true; + } + - if (!opts.zapMaster && !opts.zapManager && !opts.zapTservers && !opts.zapCompactors - && !opts.zapCoordinators && !opts.zapScanServers && !opts.zapGc && !opts.zapMonitor) { ++ if (!opts.zapManager && !opts.zapTservers && !opts.zapCompactors && !opts.zapScanServers ++ && !opts.zapGc && !opts.zapMonitor) { new JCommander(opts).usage(); return; } - String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); - Path instanceDir = new Path(volDir, "instance_id"); - InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); - ZooReaderWriter zoo = new ZooReaderWriter(siteConf); - - if (opts.zapMaster) { - log.warn("The -master option is deprecated. Please use -manager instead."); - } - if (opts.zapManager || opts.zapMaster) { - String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; - + var zrw = context.getZooSession().asReaderWriter(); + if (opts.zapManager) { + ServiceLockPath managerLockPath = context.getServerPaths().createManagerPath(); try { - zapDirectory(zrw, managerLockPath, opts); - removeSingletonLock(zoo, managerLockPath, hostPortPredicate, opts); ++ removeSingletonLock(zrw, managerLockPath, addressSelector, opts); } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); + log.error("Error deleting manager lock", e); } } - ResourceGroupPredicate rgp; - if (!opts.resourceGroup.isEmpty()) { - rgp = rg -> rg.equals(opts.resourceGroup); - } else { - rgp = rg -> true; + if (opts.zapGc) { - String gcLockPath = Constants.ZROOT + "/" + iid + Constants.ZGC_LOCK; ++ ServiceLockPath gcLockPath = context.getServerPaths().createGarbageCollectorPath(); + try { - removeSingletonLock(zoo, gcLockPath, hostPortPredicate, opts); ++ removeSingletonLock(zrw, gcLockPath, addressSelector, opts); + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting manager lock", e); + } + } + + if (opts.zapMonitor) { - String monitorLockPath = Constants.ZROOT + "/" + iid + Constants.ZMONITOR_LOCK; ++ ServiceLockPath monitorLockPath = context.getServerPaths().createMonitorPath(); + try { - removeSingletonLock(zoo, monitorLockPath, hostPortPredicate, opts); ++ removeSingletonLock(zrw, monitorLockPath, addressSelector, opts); + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting monitor lock", e); + } } if (opts.zapTservers) { - String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; try { - if ((opts.zapManager || opts.zapMaster) && opts.hostPortExcludeFile == null - && !opts.dryRun) { - // When shutting down all tablet servers and the manager, then completely clean up all - // tservers entries in zookeeper - List<String> children = zoo.getChildren(tserversPath); - for (String child : children) { - message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts); - zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP); - } - } else { - removeLocks(zoo, tserversPath, hostPortPredicate, opts); + Set<ServiceLockPath> tserverLockPaths = - context.getServerPaths().getTabletServer(rgp, AddressSelector.all(), false); ++ context.getServerPaths().getTabletServer(rgp, addressSelector, false); + Set<String> tserverResourceGroupPaths = new HashSet<>(); + tserverLockPaths.forEach(p -> tserverResourceGroupPaths + .add(p.toString().substring(0, p.toString().lastIndexOf('/')))); + for (String group : tserverResourceGroupPaths) { + message("Deleting tserver " + group + " from zookeeper", opts); + zrw.recursiveDelete(group.toString(), NodeMissingPolicy.SKIP); } } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); + log.error("Error deleting tserver locks", e); } } - // Remove the tracers, we don't use them anymore. - @SuppressWarnings("deprecation") - String path = siteConf.get(Property.TRACE_ZK_PATH); - try { - zapDirectory(zoo, path, opts); - } catch (Exception e) { - // do nothing if the /tracers node does not exist. - } - - if (opts.zapCoordinators) { - final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK; - try { - removeSingletonLock(zoo, coordinatorPath, hostPortPredicate, opts); - } catch (KeeperException | InterruptedException e) { - log.error("Error deleting coordinator from zookeeper", e); - } - } - if (opts.zapCompactors) { - String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS; + Set<ServiceLockPath> compactorLockPaths = - context.getServerPaths().getCompactor(rgp, AddressSelector.all(), false); ++ context.getServerPaths().getCompactor(rgp, addressSelector, false); + Set<String> compactorResourceGroupPaths = new HashSet<>(); + compactorLockPaths.forEach(p -> compactorResourceGroupPaths + .add(p.toString().substring(0, p.toString().lastIndexOf('/')))); try { - removeGroupedLocks(zoo, compactorsBasepath, groupPredicate, hostPortPredicate, opts); + for (String group : compactorResourceGroupPaths) { + message("Deleting compactor " + group + " from zookeeper", opts); + zrw.recursiveDelete(group, NodeMissingPolicy.SKIP); + } } catch (KeeperException | InterruptedException e) { - log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e); + log.error("Error deleting compactors from zookeeper", e); } } if (opts.zapScanServers) { - String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS; + Set<ServiceLockPath> sserverLockPaths = - context.getServerPaths().getScanServer(rgp, AddressSelector.all(), false); ++ context.getServerPaths().getScanServer(rgp, addressSelector, false); + Set<String> sserverResourceGroupPaths = new HashSet<>(); + sserverLockPaths.forEach(p -> sserverResourceGroupPaths + .add(p.toString().substring(0, p.toString().lastIndexOf('/')))); + try { - removeGroupedLocks(zoo, sserversPath, groupPredicate, hostPortPredicate, opts); + for (String group : sserverResourceGroupPaths) { + message("Deleting sserver " + group + " from zookeeper", opts); + zrw.recursiveDelete(group, NodeMissingPolicy.SKIP); + } } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); + log.error("Error deleting scan server locks", e); } } } - private static void zapDirectory(ZooReaderWriter zoo, String path, Opts opts) + private static void zapDirectory(ZooReaderWriter zoo, ServiceLockPath path, Opts opts) throws KeeperException, InterruptedException { - List<String> children = zoo.getChildren(path); + List<String> children = zoo.getChildren(path.toString()); for (String child : children) { message("Deleting " + path + "/" + child + " from zookeeper", opts); - zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP); + if (!opts.dryRun) { + zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP); + } + } + } + - private static void removeGroupedLocks(ZooReaderWriter zoo, String path, - Predicate<String> groupPredicate, Predicate<HostAndPort> hostPortPredicate, Opts opts) - throws KeeperException, InterruptedException { - if (zoo.exists(path)) { - List<String> groups = zoo.getChildren(path); - for (String group : groups) { - if (groupPredicate.test(group)) { - removeLocks(zoo, path + "/" + group, hostPortPredicate, opts); - } - } - } - } - - private static void removeLocks(ZooReaderWriter zoo, String path, - Predicate<HostAndPort> hostPortPredicate, Opts opts) - throws KeeperException, InterruptedException { - if (zoo.exists(path)) { - List<String> children = zoo.getChildren(path); - for (String child : children) { - if (hostPortPredicate.test(HostAndPort.fromString(child))) { - message("Deleting " + path + "/" + child + " from zookeeper", opts); - if (!opts.dryRun) { - // TODO not sure this is the correct way to delete this lock.. the code was deleting - // locks in multiple different ways for diff servers types. - zoo.recursiveDelete(path + "/" + child, NodeMissingPolicy.SKIP); - } - } - } - } - } - - private static void removeSingletonLock(ZooReaderWriter zoo, String path, - Predicate<HostAndPort> hostPortPredicate, Opts ops) - throws KeeperException, InterruptedException { - var lockData = ServiceLock.getLockData(zoo.getZooKeeper(), ServiceLock.path(path)); - if (lockData != null - && hostPortPredicate.test(HostAndPort.fromString(new String(lockData, UTF_8)))) { ++ private static void removeSingletonLock(ZooReaderWriter zoo, ServiceLockPath path, ++ AddressSelector addressSelector, Opts ops) throws KeeperException, InterruptedException { ++ if (addressSelector.getPredicate().test(path.getServer())) { + zapDirectory(zoo, path, ops); } } + }
