This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 00381f90e263e910edf48b25b98a607241d0e907 Merge: f65e15d954 89efffe440 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Apr 4 20:19:26 2025 +0000 Merge branch '2.1' .../java/org/apache/accumulo/core/Constants.java | 1 + .../org/apache/accumulo/server/AbstractServer.java | 13 ++ .../apache/accumulo/server/util/UpgradeUtil.java | 138 +++++++++++++++++++++ .../apache/accumulo/test/start/KeywordStartIT.java | 2 + 4 files changed, 154 insertions(+) diff --cc core/src/main/java/org/apache/accumulo/core/Constants.java index c77fc5d99c,2a36272bea..b8fc084004 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@@ -81,7 -91,10 +81,8 @@@ public class Constants public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations"; public static final String ZRECOVERY = "/recovery"; + public static final String ZPREPARE_FOR_UPGRADE = "/upgrade_ready"; + public static final String ZUPGRADE_PROGRESS = "/upgrade_progress"; /** * Base znode for storing secret keys that back delegation tokens diff --cc server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index be1c6fba0c,20f7db5b12..13d59ca560 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@@ -46,14 -35,10 +46,15 @@@ import org.apache.accumulo.core.process import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.server.mem.LowMemoryDetector; +import org.apache.accumulo.server.metrics.MetricResponseWrapper; import org.apache.accumulo.server.metrics.ProcessMetrics; import org.apache.accumulo.server.security.SecurityUtil; +import org.apache.thrift.TException; + import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -81,20 -61,30 +82,32 @@@ public abstract class AbstractServe private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); private final AtomicBoolean shutdownComplete = new AtomicBoolean(false); - protected AbstractServer(String appName, ServerOpts opts, String[] args) { - this.log = LoggerFactory.getLogger(getClass().getName()); - this.applicationName = appName; - opts.parseArgs(appName, args); - this.hostname = Objects.requireNonNull(opts.getAddress()); + protected AbstractServer(ServerId.Type serverType, ConfigOpts opts, + Function<SiteConfiguration,ServerContext> serverContextFactory, String[] args) { + this.applicationName = serverType.name(); + opts.parseArgs(applicationName, args); var siteConfig = opts.getSiteConfiguration(); + this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS); + this.resourceGroup = getResourceGroupPropertyValue(siteConfig); + ClusterConfigParser.validateGroupNames(List.of(resourceGroup)); SecurityUtil.serverLogin(siteConfig); - context = new ServerContext(siteConfig); - final String upgradePrepNode = context.getZooKeeperRoot() + Constants.ZPREPARE_FOR_UPGRADE; + context = serverContextFactory.apply(siteConfig); + log = LoggerFactory.getLogger(getClass()); + try { - if (context.getZooReader().exists(upgradePrepNode)) { ++ if (context.getZooSession().asReader().exists(Constants.ZPREPARE_FOR_UPGRADE)) { + throw new IllegalStateException( + "Instance has been prepared for upgrade to a minor or major version greater than " + + Constants.VERSION + ", no servers can be started." + + " To undo this state and abort upgrade preparations delete the zookeeper node: " - + upgradePrepNode); ++ + Constants.ZPREPARE_FOR_UPGRADE); + } + } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException( - "Error checking for upgrade preparation node (" + upgradePrepNode + ") in zookeeper", e); ++ throw new IllegalStateException("Error checking for upgrade preparation node (" ++ + Constants.ZPREPARE_FOR_UPGRADE + ") in zookeeper", e); + } log.info("Version " + Constants.VERSION); log.info("Instance " + context.getInstanceID()); - context.init(appName); + context.init(applicationName); ClassLoaderUtil.initContextFactory(context.getConfiguration()); TraceUtil.initializeTracer(context.getConfiguration()); if (context.getSaslParams() != null) { diff --cc server/base/src/main/java/org/apache/accumulo/server/util/UpgradeUtil.java index 0000000000,5c880da99d..6cc1537b75 mode 000000,100644..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/UpgradeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/UpgradeUtil.java @@@ -1,0 -1,146 +1,138 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.server.util; + -import org.apache.accumulo.core.Constants; ++import static org.apache.accumulo.core.Constants.ZFATE; ++import static org.apache.accumulo.core.Constants.ZPREPARE_FOR_UPGRADE; ++ + import org.apache.accumulo.core.cli.Help; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.conf.SiteConfiguration; -import org.apache.accumulo.core.data.InstanceId; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; + import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; + import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; -import org.apache.accumulo.core.volume.VolumeConfiguration; -import org.apache.accumulo.server.fs.VolumeManager; ++import org.apache.accumulo.core.zookeeper.ZooSession; ++import org.apache.accumulo.server.ServerContext; + import org.apache.accumulo.server.security.SecurityUtil; + import org.apache.accumulo.start.spi.KeywordExecutable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; + import org.apache.zookeeper.KeeperException; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.beust.jcommander.JCommander; + import com.beust.jcommander.Parameter; + import com.google.auto.service.AutoService; + + @AutoService(KeywordExecutable.class) + public class UpgradeUtil implements KeywordExecutable { + + private static final Logger LOG = LoggerFactory.getLogger(UpgradeUtil.class); + + static class Opts extends Help { + @Parameter(names = "--prepare", + description = "prepare an older version instance for an upgrade to a newer non-bugfix release." + + " This command should be run using the older version of software after the instance is shut down.") + boolean prepare = false; + } + + @Override + public String keyword() { + return "upgrade"; + } + + @Override + public String description() { + return "utility used to perform various upgrade steps for an Accumulo instance"; + } + + @Override + public void execute(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(keyword(), args); + + if (!opts.prepare) { + new JCommander(opts).usage(); + return; + } + + var siteConf = SiteConfiguration.auto(); + // Login as the server on secure HDFS + if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + SecurityUtil.serverLogin(siteConf); + } + - String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); - Path instanceDir = new Path(volDir, "instance_id"); - InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); - ZooReaderWriter zoo = new ZooReaderWriter(siteConf); - - if (opts.prepare) { - final String zUpgradepath = Constants.ZROOT + "/" + iid + Constants.ZPREPARE_FOR_UPGRADE; - try { - if (zoo.exists(zUpgradepath)) { - zoo.delete(zUpgradepath); ++ try (var context = new ServerContext(siteConf)) { ++ final ZooSession zs = context.getZooSession(); ++ final ZooReaderWriter zoo = zs.asReaderWriter(); ++ if (opts.prepare) { ++ try { ++ if (zoo.exists(ZPREPARE_FOR_UPGRADE)) { ++ zoo.delete(ZPREPARE_FOR_UPGRADE); ++ } ++ } catch (KeeperException | InterruptedException e) { ++ throw new IllegalStateException("Error creating or checking for " + ZPREPARE_FOR_UPGRADE ++ + " node in zookeeper: " + e.getMessage(), e); + } - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Error creating or checking for " + zUpgradepath - + " node in zookeeper: " + e.getMessage(), e); - } + - LOG.info("Upgrade specified, validating that Manager is stopped"); - final ServiceLockPath mgrPath = - ServiceLock.path(Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK); - try { - if (ServiceLock.getLockData(zoo.getZooKeeper(), mgrPath) != null) { ++ LOG.info("Upgrade specified, validating that Manager is stopped"); ++ if (context.getServerPaths().getManager(true) != null) { + throw new IllegalStateException( + "Manager is running, shut it down and retry this operation"); + } - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Error trying to determine if Manager lock is held", e); - } + - LOG.info("Checking for existing fate transactions"); - try { - // Adapted from UpgradeCoordinator.abortIfFateTransactions - if (!zoo.getChildren(Constants.ZFATE).isEmpty()) { - throw new IllegalStateException("Cannot complete upgrade preparation" - + " because FATE transactions exist. You can start a tserver, but" - + " not the Manager, then use the shell to delete completed" - + " transactions and fail pending or in-progress transactions." - + " Once all of the FATE transactions have been removed you can" - + " retry this operation."); ++ LOG.info("Checking for existing fate transactions"); ++ try { ++ // Adapted from UpgradeCoordinator.abortIfFateTransactions ++ // TODO: After the 4.0.0 release this code block needs to be ++ // modified to account for the new Fate table. ++ if (!zoo.getChildren(ZFATE).isEmpty()) { ++ throw new IllegalStateException("Cannot complete upgrade preparation" ++ + " because FATE transactions exist. You can start a tserver, but" ++ + " not the Manager, then use the shell to delete completed" ++ + " transactions and fail pending or in-progress transactions." ++ + " Once all of the FATE transactions have been removed you can" ++ + " retry this operation."); ++ } ++ } catch (KeeperException | InterruptedException e) { ++ throw new IllegalStateException("Error checking for existing FATE transactions", e); + } - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Error checking for existing FATE transactions", e); - } + - LOG.info("Creating {} node in zookeeper, servers will be prevented from" - + " starting while this node exists", zUpgradepath); - try { - zoo.putPersistentData(zUpgradepath, new byte[0], NodeExistsPolicy.SKIP); - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Error creating " + zUpgradepath - + " node in zookeeper. Check for any issues and retry.", e); - } ++ LOG.info("Creating {} node in zookeeper, servers will be prevented from" ++ + " starting while this node exists", ZPREPARE_FOR_UPGRADE); ++ try { ++ zoo.putPersistentData(ZPREPARE_FOR_UPGRADE, new byte[0], NodeExistsPolicy.SKIP); ++ } catch (KeeperException | InterruptedException e) { ++ throw new IllegalStateException("Error creating " + ZPREPARE_FOR_UPGRADE ++ + " node in zookeeper. Check for any issues and retry.", e); ++ } + - LOG.info("Forcing removal of all server locks"); - new ZooZap().zap(siteConf, "-manager", "-compaction-coordinators", "-tservers", "-compactors", - "-sservers"); ++ LOG.info("Forcing removal of all server locks"); ++ new ZooZap().zap(context, "-manager", "-tservers", "-compactors", "-sservers"); + - LOG.info("Instance {} prepared for upgrade. Server processes will not start while" - + " in this state. To undo this state and abort upgrade preparations delete" - + " the zookeeper node: {}. If you abort and restart the instance, then you " - + " should re-run this utility before upgrading.", iid.canonical(), zUpgradepath); ++ LOG.info( ++ "Instance {} prepared for upgrade. Server processes will not start while" ++ + " in this state. To undo this state and abort upgrade preparations delete" ++ + " the zookeeper node: {}. If you abort and restart the instance, then you " ++ + " should re-run this utility before upgrading.", ++ context.getInstanceID(), ZPREPARE_FOR_UPGRADE); ++ } + } + + } + + }