This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 9e34d27f4d823215af995696275951644f11263b Merge: 439ea97c7b ee80f540cd Author: Dave Marion <[email protected]> AuthorDate: Wed Jun 18 18:32:13 2025 +0000 Merge branch '2.1' .../org/apache/accumulo/core/conf/Property.java | 5 + minicluster/pom.xml | 4 + .../MiniAccumuloClusterControl.java | 39 ++- .../org/apache/accumulo/server/AbstractServer.java | 47 +++- .../org/apache/accumulo/compactor/Compactor.java | 7 +- .../apache/accumulo/gc/SimpleGarbageCollector.java | 7 +- .../java/org/apache/accumulo/manager/Manager.java | 19 +- .../apache/accumulo/monitor/EmbeddedWebServer.java | 6 +- .../java/org/apache/accumulo/monitor/Monitor.java | 26 +- .../org/apache/accumulo/tserver/ScanServer.java | 7 +- .../org/apache/accumulo/tserver/TabletServer.java | 19 +- .../test/functional/AdvertiseAndBindIT.java | 261 +++++++++++++++++++++ .../test/functional/ConfigurableMacBase.java | 2 +- 13 files changed, 389 insertions(+), 60 deletions(-) diff --cc minicluster/pom.xml index 9904356bff,4a304429b0..109796cc9b --- a/minicluster/pom.xml +++ b/minicluster/pom.xml @@@ -79,6 -83,6 +79,10 @@@ <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-tserver</artifactId> </dependency> ++ <dependency> ++ <groupId>org.apache.commons</groupId> ++ <artifactId>commons-lang3</artifactId> ++ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client-api</artifactId> diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index 802f582621,5b83301611..25aed200b0 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@@ -40,6 -49,9 +40,7 @@@ import org.apache.accumulo.minicluster. import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.server.util.ZooZap; -import org.apache.accumulo.tserver.ScanServer; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; ++import org.apache.commons.lang3.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -123,16 -194,17 +124,16 @@@ public class MiniAccumuloClusterContro @Override public synchronized void start(ServerType server, String hostname) throws IOException { - start(server, Collections.emptyMap(), Integer.MAX_VALUE, null); - start(server, Collections.emptyMap(), Integer.MAX_VALUE); ++ start(server, Collections.emptyMap(), Integer.MAX_VALUE, null, new String[] {}); } public synchronized void start(ServerType server, Map<String,String> configOverrides, int limit) throws IOException { - start(server, configOverrides, limit, null); - start(server, configOverrides, limit, new String[] {}); ++ start(server, configOverrides, limit, null, new String[] {}); } - @SuppressWarnings("removal") public synchronized void start(ServerType server, Map<String,String> configOverrides, int limit, - Class<?> classOverride) throws IOException { - String... args) throws IOException { ++ Class<?> classOverride, String... args) throws IOException { if (limit <= 0) { return; } @@@ -147,22 -214,18 +148,27 @@@ switch (server) { case TABLET_SERVER: synchronized (tabletServerProcesses) { - int count = 0; - for (int i = tabletServerProcesses.size(); - count < limit && i < cluster.getConfig().getNumTservers(); i++, ++count) { - tabletServerProcesses - .add(cluster._exec(classToUse, server, configOverrides, args).getProcess()); + Map<String,Integer> tserverGroups = + cluster.getConfig().getClusterServerConfiguration().getTabletServerConfiguration(); + for (Entry<String,Integer> e : tserverGroups.entrySet()) { + List<Process> processes = + tabletServerProcesses.computeIfAbsent(e.getKey(), k -> new ArrayList<>()); + int count = 0; + for (int i = processes.size(); count < limit && i < e.getValue(); i++, ++count) { - processes.add(cluster._exec(classToUse, server, configOverrides, "-o", - Property.TSERV_GROUP_NAME.getKey() + "=" + e.getKey()).getProcess()); ++ processes ++ .add( ++ cluster ++ ._exec(classToUse, server, configOverrides, ++ ArrayUtils.addAll(args, "-o", ++ Property.TSERV_GROUP_NAME.getKey() + "=" + e.getKey())) ++ .getProcess()); + } } } break; - case MASTER: case MANAGER: if (managerProcess == null) { - managerProcess = cluster._exec(classToUse, server, configOverrides).getProcess(); + managerProcess = cluster._exec(classToUse, server, configOverrides, args).getProcess(); } break; case ZOOKEEPER: @@@ -184,31 -247,39 +190,40 @@@ break; case SCAN_SERVER: synchronized (scanServerProcesses) { - int count = 0; - for (int i = scanServerProcesses.size(); - count < limit && i < cluster.getConfig().getNumScanServers(); i++, ++count) { - scanServerProcesses - .add(cluster._exec(classToUse, server, configOverrides, args).getProcess()); - } - } - break; - case COMPACTION_COORDINATOR: - if (coordinatorProcess == null) { - coordinatorProcess = - cluster._exec(classToUse, ServerType.COMPACTION_COORDINATOR, configOverrides, args) - .getProcess(); - // Wait for coordinator to start - TExternalCompactionList metrics = null; - while (metrics == null) { - try { - metrics = getRunningCompactions(cluster.getServerContext()); - } catch (TException e) { - log.debug( - "Error getting running compactions from coordinator, message: " + e.getMessage()); - UtilWaitThread.sleep(250); + Map<String,Integer> sserverGroups = + cluster.getConfig().getClusterServerConfiguration().getScanServerConfiguration(); + for (Entry<String,Integer> e : sserverGroups.entrySet()) { + List<Process> processes = + scanServerProcesses.computeIfAbsent(e.getKey(), k -> new ArrayList<>()); + int count = 0; + for (int i = processes.size(); count < limit && i < e.getValue(); i++, ++count) { - processes.add(cluster._exec(classToUse, server, configOverrides, "-o", - Property.SSERV_GROUP_NAME.getKey() + "=" + e.getKey()).getProcess()); ++ processes ++ .add( ++ cluster ++ ._exec(classToUse, server, configOverrides, ++ ArrayUtils.addAll(args, "-o", ++ Property.SSERV_GROUP_NAME.getKey() + "=" + e.getKey())) ++ .getProcess()); } } } break; case COMPACTOR: synchronized (compactorProcesses) { - int count = - Math.min(limit, cluster.getConfig().getNumCompactors() - compactorProcesses.size()); - for (int i = 0; i < count; i++) { - compactorProcesses - .add(cluster._exec(classToUse, server, configOverrides, args).getProcess()); + Map<String,Integer> compactorGroups = + cluster.getConfig().getClusterServerConfiguration().getCompactorConfiguration(); + for (Entry<String,Integer> e : compactorGroups.entrySet()) { + List<Process> processes = + compactorProcesses.computeIfAbsent(e.getKey(), k -> new ArrayList<>()); + int count = 0; + for (int i = processes.size(); count < limit && i < e.getValue(); i++, ++count) { - processes.add(cluster._exec(classToUse, server, configOverrides, "-o", - Property.COMPACTOR_GROUP_NAME.getKey() + "=" + e.getKey()).getProcess()); ++ processes ++ .add(cluster ++ ._exec(classToUse, server, configOverrides, ++ ArrayUtils.addAll(args, "-o", ++ Property.COMPACTOR_GROUP_NAME.getKey() + "=" + e.getKey())) ++ .getProcess()); + } } } break; diff --cc server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index 2b6fd2a360,cd5de7c442..6a236be6ac --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@@ -68,40 -49,63 +68,54 @@@ import io.micrometer.core.instrument.Me public abstract class AbstractServer implements AutoCloseable, MetricsProducer, Runnable, ServerProcessService.Iface { + private final MetricSource metricSource; private final ServerContext context; protected final String applicationName; - private volatile String hostname; - private HostAndPort advertiseAddress; // used for everything but the Thrift server (e.g. ZK, - // metadata, etc). ++ private volatile HostAndPort advertiseAddress; // used for everything but the Thrift server (e.g. ++ // ZK, ++ // metadata, etc). + private final String bindAddress; // used for the Thrift server + private final String resourceGroup; private final Logger log; private final ProcessMetrics processMetrics; - protected final long idleReportingPeriodNanos; - private volatile long idlePeriodStartNanos = 0L; + protected final long idleReportingPeriodMillis; + private volatile Timer idlePeriodTimer = null; private volatile Thread serverThread; private volatile Thread verificationThread; private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); private final AtomicBoolean shutdownComplete = new AtomicBoolean(false); - protected AbstractServer(String appName, ServerOpts opts, String[] args) { - this.log = LoggerFactory.getLogger(getClass().getName()); - this.applicationName = appName; - opts.parseArgs(appName, args); + protected AbstractServer(ServerId.Type serverType, ConfigOpts opts, + Function<SiteConfiguration,ServerContext> serverContextFactory, String[] args) { ++ log = LoggerFactory.getLogger(getClass()); + this.applicationName = serverType.name(); + opts.parseArgs(applicationName, args); var siteConfig = opts.getSiteConfiguration(); - boolean oldBindParameterSpecifiedOnCmdLine = false; - boolean newBindParameterSpecified = false; - for (String arg : args) { - if (arg.equals("-a") || arg.equals("--address")) { - oldBindParameterSpecifiedOnCmdLine = true; - } else if (siteConfig.isPropertySet(Property.RPC_PROCESS_BIND_ADDRESS)) { - newBindParameterSpecified = true; - } - } - if (oldBindParameterSpecifiedOnCmdLine && newBindParameterSpecified) { - throw new IllegalStateException("Argument '-a' cannot be used with property 'rpc.bind.addr'"); - } final String newBindParameter = siteConfig.get(Property.RPC_PROCESS_BIND_ADDRESS); // If new bind parameter passed on command line or in file, then use it. - if (newBindParameterSpecified - || !newBindParameter.equals(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue())) { + if (newBindParameter != null + && !newBindParameter.equals(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue())) { - this.hostname = newBindParameter; + this.bindAddress = newBindParameter; - } else if (oldBindParameterSpecifiedOnCmdLine) { - this.bindAddress = opts.getAddress(); } else { - this.hostname = ConfigOpts.BIND_ALL_ADDRESSES; - this.bindAddress = ServerOpts.BIND_ALL_ADDRESSES; ++ this.bindAddress = ConfigOpts.BIND_ALL_ADDRESSES; } + String advertAddr = siteConfig.get(Property.RPC_PROCESS_ADVERTISE_ADDRESS); + if (advertAddr != null && !advertAddr.isBlank()) { + HostAndPort advertHP = HostAndPort.fromString(advertAddr); - if (advertHP.getHost().equals(ServerOpts.BIND_ALL_ADDRESSES)) { ++ if (advertHP.getHost().equals(ConfigOpts.BIND_ALL_ADDRESSES)) { + throw new IllegalArgumentException("Advertise address cannot be 0.0.0.0"); + } + advertiseAddress = advertHP; + } else { + advertiseAddress = null; + } + log.info("Bind address: {}, advertise address: {}", bindAddress, advertiseAddress); + this.resourceGroup = getResourceGroupPropertyValue(siteConfig); + ClusterConfigParser.validateGroupNames(List.of(resourceGroup)); SecurityUtil.serverLogin(siteConfig); - context = new ServerContext(siteConfig); - final String upgradePrepNode = context.getZooKeeperRoot() + Constants.ZPREPARE_FOR_UPGRADE; + context = serverContextFactory.apply(siteConfig); - log = LoggerFactory.getLogger(getClass()); try { - if (context.getZooReader().exists(upgradePrepNode)) { + if (context.getZooSession().asReader().exists(Constants.ZPREPARE_FOR_UPGRADE)) { throw new IllegalStateException( "Instance has been prepared for upgrade to a minor or major version greater than " + Constants.VERSION + ", no servers can be started." @@@ -277,15 -243,23 +291,24 @@@ if (processMetrics != null) { processMetrics.registerMetrics(registry); } + getContext().setMeterRegistry(registry); } - public String getHostname() { - return hostname; + public HostAndPort getAdvertiseAddress() { + return advertiseAddress; } - public void setHostname(HostAndPort address) { - hostname = address.toString(); + public String getBindAddress() { + return bindAddress; + } + + protected void updateAdvertiseAddress(HostAndPort thriftBindAddress) { + if (advertiseAddress == null) { + advertiseAddress = thriftBindAddress; + } else if (!advertiseAddress.hasPort()) { + advertiseAddress = + HostAndPort.fromParts(advertiseAddress.getHost(), thriftBindAddress.getPort()); + } } public ServerContext getContext() { @@@ -300,47 -274,6 +323,49 @@@ return applicationName; } + @Override + public MetricResponse getMetrics(TInfo tinfo, TCredentials credentials) throws TException { + + if (!context.getSecurityOperation().authenticateUser(credentials, credentials)) { + throw new ThriftSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED); + } + + final FlatBufferBuilder builder = new FlatBufferBuilder(1024); + final MetricResponseWrapper response = new MetricResponseWrapper(builder); + - if (getHostname().startsWith(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue())) { - log.error("Host is not set, this should have been done after starting the Thrift service."); ++ if (getAdvertiseAddress().toString() ++ .startsWith(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue())) { ++ log.error( ++ "Advertise address is not set, this should have been done after starting the Thrift service."); + return response; + } + + if (metricSource == null) { + // Metrics not reported for Monitor type + return response; + } + + response.setServerType(metricSource); - response.setServer(getHostname()); ++ response.setServer(getAdvertiseAddress().toString()); + response.setResourceGroup(getResourceGroup()); + response.setTimestamp(System.currentTimeMillis()); + + if (context.getMetricsInfo().isMetricsEnabled()) { + Metrics.globalRegistry.getMeters().forEach(m -> { + if (m.getId().getName().startsWith("accumulo.")) { + m.match(response::writeMeter, response::writeMeter, response::writeTimer, + response::writeDistributionSummary, response::writeLongTaskTimer, + response::writeMeter, response::writeMeter, response::writeFunctionTimer, + response::writeMeter); + } + }); + } + + builder.clear(); + return response; + } + /** * Get the ServiceLock for this server process. May return null if called before the lock is * acquired. diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index e67bcdec33,ce18a0ef9e..46789203f6 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -320,16 -319,15 +320,16 @@@ public class Compactor extends Abstract * @throws UnknownHostException host unknown */ protected ServerAddress startCompactorClientService() throws UnknownHostException { - var processor = ThriftProcessorTypes.getCompactorTProcessor(this, this, getContext()); - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.GENERAL_MAX_MESSAGE_SIZE); - ServerAddress sp = TServerUtils.startServer(getContext(), getBindAddress(), + + ClientServiceHandler clientHandler = new ClientServiceHandler(getContext()); + var processor = ThriftProcessorTypes.getCompactorTProcessor(this, clientHandler, + getCompactorThriftHandlerInterface(), getContext()); - ServerAddress sp = TServerUtils.createThriftServer(getContext(), getHostname(), ++ ServerAddress sp = TServerUtils.createThriftServer(getContext(), getBindAddress(), Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(), - "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, - Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK, - maxMessageSizeProperty); + Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, + Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK); + sp.startThriftServer("Thrift Client Server"); - setHostname(sp.address); ++ updateAdvertiseAddress(sp.address); LOG.info("address = {}", sp.address); return sp; } diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index fecb3e7145,a7cfca69b8..c0f9fd3084 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -427,17 -424,19 +428,17 @@@ public class SimpleGarbageCollector ext private HostAndPort startStatsService() { var processor = ThriftProcessorTypes.getGcTProcessor(this, this, getContext()); IntStream port = getConfiguration().getPortStream(Property.GC_PORT); - HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), port); - String hostname = HostAndPort.fromString(getBindAddress()).getHost(); - HostAndPort[] addresses = TServerUtils.getHostAndPorts(hostname, port); - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.GENERAL_MAX_MESSAGE_SIZE); - long maxMessageSize = getConfiguration().getAsBytes(maxMessageSizeProperty); - ServerAddress server = TServerUtils.startTServer(getConfiguration(), - getContext().getThriftServerType(), processor, this.getClass().getSimpleName(), - "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize, - getContext().getServerSslParams(), getContext().getSaslParams(), 0, - getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(), false, - addresses); - log.debug("Starting garbage collector listening on " + server.address); ++ HostAndPort[] addresses = TServerUtils.getHostAndPorts(getBindAddress(), port); + long maxMessageSize = getConfiguration().getAsBytes(Property.RPC_MAX_MESSAGE_SIZE); + ServerAddress server = + TServerUtils.createThriftServer(getConfiguration(), getContext().getThriftServerType(), + processor, this.getClass().getSimpleName(), 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, + 1000, maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(), + 0, getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(), + false, addresses); + server.startThriftServer("GC Monitor Service"); - setHostname(server.address); ++ updateAdvertiseAddress(server.address); + log.debug("Starting garbage collector listening on {}", server.address); return server.address; } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index bdf6507ed0,2acf1fadf2..28b435ef13 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -1121,40 -1233,46 +1121,40 @@@ public class Manager extends AbstractSe // Start the Manager's Fate Service fateServiceHandler = new FateServiceHandler(this); managerClientHandler = new ManagerClientServiceHandler(this); - // Start the Manager's Client service - // Ensure that calls before the manager gets the lock fail - ManagerClientService.Iface haProxy = - HighlyAvailableServiceWrapper.service(managerClientHandler, this); + compactionCoordinator = new CompactionCoordinator(this, fateRefs); ServerAddress sa; - var processor = - ThriftProcessorTypes.getManagerTProcessor(this, fateServiceHandler, haProxy, getContext()); + var processor = ThriftProcessorTypes.getManagerTProcessor(this, fateServiceHandler, + compactionCoordinator.getThriftService(), managerClientHandler, getContext()); try { - sa = TServerUtils.createThriftServer(context, getHostname(), Property.MANAGER_CLIENTPORT, - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.GENERAL_MAX_MESSAGE_SIZE); - sa = TServerUtils.startServer(context, getBindAddress(), Property.MANAGER_CLIENTPORT, - processor, "Manager", "Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, - Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK, - maxMessageSizeProperty); ++ sa = TServerUtils.createThriftServer(context, getBindAddress(), Property.MANAGER_CLIENTPORT, + processor, "Manager", null, Property.MANAGER_MINTHREADS, + Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK); } catch (UnknownHostException e) { - throw new IllegalStateException("Unable to start server on host " + getHostname(), e); + throw new IllegalStateException("Unable to start server on host " + getBindAddress(), e); } - clientService = sa.server; - log.info("Started Manager client service at {}", sa.address); - updateAdvertiseAddress(sa.getAddress()); - final HostAndPort clientAddress = getAdvertiseAddress(); - - // block until we can obtain the ZK lock for the manager + // block until we can obtain the ZK lock for the manager. Create the + // initial lock using ThriftService.NONE. This will allow the lock + // allocation to occur, but prevent any services from getting the + // Manager address for the COORDINATOR, FATE, and MANAGER services. + // The lock data is replaced below and the manager address is exposed + // for each of these services. + ServiceLockData sld; try { - getManagerLock(ServiceLock.path(zroot + Constants.ZMANAGER_LOCK), clientAddress); + sld = getManagerLock(context.getServerPaths().createManagerPath()); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception getting manager lock", e); } + // Setting the Manager state to HAVE_LOCK has the side-effect of + // starting the upgrade process if necessary. + setManagerState(ManagerState.HAVE_LOCK); - MetricsInfo metricsInfo = getContext().getMetricsInfo(); - - var producers = ManagerMetrics.getProducers(getConfiguration(), this); - producers.add(balancerMetrics); - metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0])); - metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), - clientAddress, "")); + // Set the HostName **after** initially creating the lock. The lock data is + // updated below with the correct address. This prevents clients from accessing + // the Manager until all of the internal processes are started. - setHostname(sa.address); ++ updateAdvertiseAddress(sa.getAddress()); recoveryManager = new RecoveryManager(this, timeToCacheRecoveryWalExistence); @@@ -1381,27 -1407,10 +1381,27 @@@ log.info("AuthenticationTokenSecretManager is initialized"); } - String address = clientAddress.toString(); - log.info("Setting manager lock data to {}", address); + // Now that the Manager is up, start the ThriftServer + sa.startThriftServer("Manager Client Service Handler"); + clientService = sa.server; + log.info("Started Manager client service at {}", sa.address); + + // Replace the ServiceLockData information in the Manager lock node in ZooKeeper. + // This advertises the address that clients can use to connect to the Manager + // for the Coordinator, Fate, and Manager services. Do **not** do this until + // after the upgrade process is finished and the dependent services are started. + UUID uuid = sld.getServerUUID(ThriftService.NONE); + ServiceDescriptors descriptors = new ServiceDescriptors(); + for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR, + ThriftService.FATE}) { - descriptors - .addService(new ServiceDescriptor(uuid, svc, getHostname(), this.getResourceGroup())); ++ descriptors.addService(new ServiceDescriptor(uuid, svc, getAdvertiseAddress().toString(), ++ this.getResourceGroup())); + } + + sld = new ServiceLockData(descriptors); + log.info("Setting manager lock data to {}", sld); try { - managerLock.replaceLockData(address.getBytes(UTF_8)); + managerLock.replaceLockData(sld); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception updating manager lock", e); } @@@ -1577,23 -1654,15 +1577,24 @@@ return managerLock; } - private void getManagerLock(final ServiceLockPath zManagerLoc, HostAndPort advertiseAddress) + private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc) throws KeeperException, InterruptedException { - var zooKeeper = getContext().getZooReaderWriter().getZooKeeper(); + var zooKeeper = getContext().getZooSession(); log.info("trying to get manager lock"); - final String managerClientAddress = - getHostname() + ":" + getConfiguration().getPort(Property.MANAGER_CLIENTPORT)[0]; - UUID zooLockUUID = UUID.randomUUID(); + + ServiceDescriptors descriptors = new ServiceDescriptors(); ++ // This method creates the lock with the ThriftServer set to NONE ++ // and the address set to 0.0.0.0. When the lock is acquired (could be ++ // waiting to due an HA-pair), then the Manager startup process begins ++ // and the lock service descriptors are updated with the advertise address + descriptors.addService(new ServiceDescriptor(zooLockUUID, ThriftService.NONE, - managerClientAddress, this.getResourceGroup())); ++ ConfigOpts.BIND_ALL_ADDRESSES, this.getResourceGroup())); + ServiceLockData sld = new ServiceLockData(descriptors); managerLock = new ServiceLock(zooKeeper, zManagerLoc, zooLockUUID); HAServiceLockWatcher managerLockWatcher = - new HAServiceLockWatcher("manager", () -> getShutdownComplete().get()); + new HAServiceLockWatcher(Type.MANAGER, () -> getShutdownComplete().get()); while (true) { diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java index 63de4dc002,696cbe6a96..22b6d0fcc9 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java @@@ -51,11 -51,7 +51,11 @@@ public class EmbeddedWebServer secure = requireForSecure.stream().map(conf::get).allMatch(s -> s != null && !s.isEmpty()); connector = new ServerConnector(server, getConnectionFactories(conf, secure)); + // Capture connection statistics + connector.addBean(monitor.getConnectionStatisticsBean()); + // Listen for connection events + connector.addBean(monitor); - connector.setHost(monitor.getHostname()); + connector.setHost(monitor.getBindAddress()); connector.setPort(port); handler = diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index eb535b4e66,f798ad3c30..ba5a3a858e --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@@ -377,39 -471,51 +377,47 @@@ public class Monitor extends AbstractSe throw new RuntimeException( "Unable to start embedded web server on ports: " + Arrays.toString(ports)); } else { - log.debug("Monitor started on port {}", livePort); + log.debug("Monitor listening on {}:{}", server.getHostName(), livePort); } - String advertiseHost = getHostname(); - if (advertiseHost.equals(ConfigOpts.BIND_ALL_ADDRESSES)) { - try { - advertiseHost = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - log.error("Unable to get hostname", e); - try { - getMonitorLock(); - } catch (Exception e) { - log.error("Failed to get Monitor ZooKeeper lock"); - throw new RuntimeException(e); - } - + HostAndPort advertiseAddress = getAdvertiseAddress(); + if (advertiseAddress == null) { + // use the bind address from the connector, unless it's null or 0.0.0.0 + String advertiseHost = server.getHostName(); - if (advertiseHost == null || advertiseHost == ServerOpts.BIND_ALL_ADDRESSES) { ++ if (advertiseHost == null || advertiseHost == ConfigOpts.BIND_ALL_ADDRESSES) { + try { + advertiseHost = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new RuntimeException("Unable to get hostname for advertise address", e); + } } + updateAdvertiseAddress(HostAndPort.fromParts(advertiseHost, livePort)); + } else { + updateAdvertiseAddress(HostAndPort.fromParts(advertiseAddress.getHost(), livePort)); } - HostAndPort monitorHostAndPort = HostAndPort.fromParts(advertiseHost, livePort); + HostAndPort monitorHostAndPort = getAdvertiseAddress(); log.debug("Using {} to advertise monitor location in ZooKeeper", monitorHostAndPort); + try { - monitorLock.replaceLockData(monitorHostAndPort.toString().getBytes(UTF_8)); - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Exception updating monitor lock with host and port", e); + getMonitorLock(monitorHostAndPort); + } catch (Exception e) { + log.error("Failed to get Monitor ZooKeeper lock"); + throw new RuntimeException(e); } + getContext().setServiceLock(monitorLock); MetricsInfo metricsInfo = getContext().getMetricsInfo(); + metricsInfo.addMetricsProducers(this); metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), - monitorHostAndPort, "")); + monitorHostAndPort, getResourceGroup())); try { - URL url = new URL(server.isSecure() ? "https" : "http", advertiseHost, server.getPort(), "/"); + URL url = new URL(server.isSecure() ? "https" : "http", monitorHostAndPort.getHost(), + server.getPort(), "/"); - final String path = context.getZooKeeperRoot() + Constants.ZMONITOR_HTTP_ADDR; - final ZooReaderWriter zoo = context.getZooReaderWriter(); + final ZooReaderWriter zoo = context.getZooSession().asReaderWriter(); // Delete before we try to re-create in case the previous session hasn't yet expired - zoo.delete(path); - zoo.putEphemeralData(path, url.toString().getBytes(UTF_8)); + zoo.delete(Constants.ZMONITOR_HTTP_ADDR); + zoo.putEphemeralData(Constants.ZMONITOR_HTTP_ADDR, url.toString().getBytes(UTF_8)); log.info("Set monitor address in zookeeper to {}", url); } catch (Exception ex) { log.error("Unable to advertise monitor HTTP address in zookeeper", ex); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 595df77ba4,78ddee2b85..8c93df5fb3 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -300,16 -306,16 +300,16 @@@ public class ScanServer extends Abstrac // This class implements TabletClientService.Iface and then delegates calls. Be sure // to set up the ThriftProcessor using this class, not the delegate. - TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, this, getContext()); + ClientServiceHandler clientHandler = new ClientServiceHandler(context); + TProcessor processor = + ThriftProcessorTypes.getScanServerTProcessor(this, clientHandler, this, getContext()); - ServerAddress sp = TServerUtils.createThriftServer(getContext(), getHostname(), - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.GENERAL_MAX_MESSAGE_SIZE); - ServerAddress sp = TServerUtils.startServer(getContext(), getBindAddress(), ++ ServerAddress sp = TServerUtils.createThriftServer(getContext(), getBindAddress(), Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), - "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS, - Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty); - + Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS, Property.SSERV_MINTHREADS_TIMEOUT, + Property.SSERV_THREADCHECK); + sp.startThriftServer("Thrift Client Server"); - setHostname(sp.address); ++ updateAdvertiseAddress(sp.address); LOG.info("address = {}", sp.address); return sp; } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index d75b9cd021,7d910f7e4b..1507b4bf9f --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -409,15 -550,17 +409,15 @@@ public class TabletServer extends Abstr } } - private HostAndPort startServer(String address, TProcessor processor) + private ServerAddress startServer(String address, TProcessor processor) throws UnknownHostException { - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.TSERV_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE); - ServerAddress sp = TServerUtils.startServer(getContext(), address, Property.TSERV_CLIENTPORT, - processor, this.getClass().getSimpleName(), "Thrift Client Server", - Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, - Property.TSERV_THREADCHECK, maxMessageSizeProperty); + ServerAddress sp = + TServerUtils.createThriftServer(getContext(), address, Property.TSERV_CLIENTPORT, processor, + this.getClass().getSimpleName(), Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, + Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); + sp.startThriftServer("Thrift Client Server"); this.server = sp.server; - return sp.address; + return sp; } private HostAndPort getManagerAddress() { @@@ -465,20 -609,46 +465,20 @@@ ThriftUtil.returnClient(client, context); } - private HostAndPort startTabletClientService() throws UnknownHostException { + private ServerAddress startTabletClientService() throws UnknownHostException { // start listening for client connection last - TransactionWatcher watcher = new TransactionWatcher(context); WriteTracker writeTracker = new WriteTracker(); - clientHandler = newClientHandler(watcher); - thriftClientHandler = newTabletClientHandler(watcher, writeTracker); + clientHandler = newClientHandler(); + thriftClientHandler = newTabletClientHandler(writeTracker); scanClientHandler = newThriftScanClientHandler(writeTracker); - TProcessor processor = ThriftProcessorTypes.getTabletServerTProcessor(this, clientHandler, - thriftClientHandler, scanClientHandler, getContext()); - ServerAddress sp = startServer(getBindAddress(), processor); - log.info("address = {}", sp.address); - return sp; - } - - @Deprecated - private void startReplicationService() throws UnknownHostException { - final var handler = - new org.apache.accumulo.tserver.replication.ReplicationServicerHandler(this); - var processor = ThriftProcessorTypes.getReplicationClientTProcessor(handler, getContext()); - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.TSERV_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE); - ServerAddress sp = TServerUtils.startServer(getContext(), clientAddress.getHost(), - Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler", - "Replication Servicer", Property.TSERV_PORTSEARCH, Property.REPLICATION_MIN_THREADS, null, - Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); - this.replServer = sp.server; - log.info("Started replication service on {}", sp.address); - - try { - // The replication service is unique to the thrift service for a tserver, not just a host. - // Advertise the host and port for replication service given the host and port for the - // tserver. - getContext().getZooReaderWriter().putPersistentData(getContext().getZooKeeperRoot() - + org.apache.accumulo.core.replication.ReplicationConstants.ZOO_TSERVERS + "/" - + clientAddress, sp.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); - } catch (Exception e) { - log.error("Could not advertise replication service port", e); - throw new RuntimeException(e); - } + TProcessor processor = + ThriftProcessorTypes.getTabletServerTProcessor(this, clientHandler, thriftClientHandler, + scanClientHandler, thriftClientHandler, thriftClientHandler, getContext()); - HostAndPort address = startServer(clientAddress.getHost(), processor); - setHostname(address); ++ ServerAddress address = startServer(getBindAddress().toString(), processor); ++ updateAdvertiseAddress(address.address); + log.info("address = {}", address); + return address; } @Override @@@ -823,10 -1120,25 +826,10 @@@ } private void config() { - log.info("Tablet server starting on {}", getHostname()); + log.info("Tablet server starting on {}", getBindAddress()); - Threads.createCriticalThread("Split/MajC initiator", new MajorCompactor(context)).start(); + CompactionWatcher.startWatching(context); - clientAddress = HostAndPort.fromParts(getHostname(), 0); + clientAddress = HostAndPort.fromParts(getBindAddress(), 0); - - final AccumuloConfiguration aconf = getConfiguration(); - - @SuppressWarnings("removal") - Property TSERV_MONITOR_FS = Property.TSERV_MONITOR_FS; - if (aconf.getBoolean(TSERV_MONITOR_FS)) { - log.warn("{} is deprecated and marked for removal.", TSERV_MONITOR_FS.getKey()); - FileSystemMonitor.start(aconf); - } - - Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration()); - - ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask, - 0, TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); - watchNonCriticalScheduledTask(future); } public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) { diff --cc test/src/main/java/org/apache/accumulo/test/functional/AdvertiseAndBindIT.java index 0000000000,2c48718870..8d7f04ce6d mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AdvertiseAndBindIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AdvertiseAndBindIT.java @@@ -1,0 -1,300 +1,261 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test.functional; + -import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1; + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertNotNull; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.net.InetAddress; + import java.util.ArrayList; + import java.util.HashMap; + import java.util.List; + import java.util.Map; -import java.util.Optional; + import java.util.Set; + -import org.apache.accumulo.coordinator.CompactionCoordinator; ++import org.apache.accumulo.core.client.admin.servers.ServerId; + import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; + import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterControl; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.util.Wait; + import org.apache.hadoop.conf.Configuration; + import org.junit.jupiter.api.Test; + ++import com.google.common.net.HostAndPort; ++ + public class AdvertiseAndBindIT extends ConfigurableMacBase { + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setNumCompactors(1); - cfg.setNumScanServers(1); - cfg.setNumTservers(1); - cfg.setServerClass(ServerType.COMPACTION_COORDINATOR, CompactionCoordinator.class); ++ cfg.getClusterServerConfiguration().setNumDefaultCompactors(1); ++ cfg.getClusterServerConfiguration().setNumDefaultScanServers(1); ++ cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s"); + } + + @Override + public void setUp() throws Exception { + // Override the setup method so that Mini is + // not started before each test. We are going to + // manage this manually. + } + + @Test + public void testAdvertiseAndBindArguments() throws Exception { + final String localHostName = InetAddress.getLocalHost().getHostName(); + + createMiniAccumulo(); + assertNotNull(cluster); + + // Accumulo will use the default bind address of "0.0.0.0" + // when it's not specified. When the bind address is the + // default, then Accumulo will use the hostname for the + // advertise address. + cluster.start(); - getCluster().getClusterControl().start(ServerType.COMPACTION_COORDINATOR, Map.of(), 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR, Map.of(), 1, - new String[] {"-q", QUEUE1}); - getCluster().getClusterControl().start(ServerType.SCAN_SERVER, Map.of(), 1, - new String[] {"-g", "DEFAULT"}); - Wait.waitFor(() -> !getServerContext().instanceOperations().getCompactors().isEmpty()); - Wait.waitFor(() -> !getServerContext().instanceOperations().getScanServers().isEmpty()); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(false); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.getHost().equals(localHostName))); + } finally { + cluster.stop(); + Thread.sleep(20_000); // wait 2x the ZK timeout to ensure ZK entries removed + } + + // Set only the bind address - restartClusterWithArguments(null, "127.0.0.1", false); ++ restartClusterWithArguments(null, "127.0.0.1"); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(false); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.getHost().equals("127.0.0.1"))); + } finally { + cluster.stop(); + Thread.sleep(20_000); // wait 2x the ZK timeout to ensure ZK entries removed + } + + // Set only the advertise address - restartClusterWithArguments("localhost", null, false); ++ restartClusterWithArguments("localhost", null); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(false); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.getHost().equals("localhost"))); + } finally { + cluster.stop(); + Thread.sleep(20_000); // wait 2x the ZK timeout to ensure ZK entries removed + } + + // Set advertise and bind address - restartClusterWithArguments("localhost", "127.0.0.1", false); ++ restartClusterWithArguments("localhost", "127.0.0.1"); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(false); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.getHost().equals("localhost"))); + } finally { + cluster.stop(); + } + + // Set advertise with port and bind address + // skip the coordinator because MiniAccumuloClusterControl.start will + // try to connect to it - restartClusterWithArguments("192.168.1.2:59000", "127.0.0.1", true); ++ restartClusterWithArguments("192.168.1.2:59000", "127.0.0.1"); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(true); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.toString().equals("192.168.1.2:59000"))); + } finally { + cluster.stop(); + } + + } + + @Test + public void testAdvertiseAndBindProperties() throws Exception { + + final String localHostName = InetAddress.getLocalHost().getHostName(); + + createMiniAccumulo(); + assertNotNull(cluster); + + // Accumulo will use the default bind address of "0.0.0.0" + // when it's not specified. When the bind address is the + // default, then Accumulo will use the hostname for the + // advertise address. + cluster.start(); - getCluster().getClusterControl().start(ServerType.COMPACTION_COORDINATOR, Map.of(), 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR, Map.of(), 1, - new String[] {"-q", QUEUE1}); - getCluster().getClusterControl().start(ServerType.SCAN_SERVER, Map.of(), 1, - new String[] {"-g", "DEFAULT"}); - Wait.waitFor(() -> !getServerContext().instanceOperations().getCompactors().isEmpty()); - Wait.waitFor(() -> !getServerContext().instanceOperations().getScanServers().isEmpty()); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(false); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.getHost().equals(localHostName))); + } finally { + cluster.stop(); + Thread.sleep(20_000); // wait 2x the ZK timeout to ensure ZK entries removed + } + + // Set only the bind address - restartClusterWithProperties(Map.of(Property.RPC_PROCESS_BIND_ADDRESS.getKey(), "127.0.0.1"), - false); ++ restartClusterWithProperties(Map.of(Property.RPC_PROCESS_BIND_ADDRESS.getKey(), "127.0.0.1")); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(false); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.getHost().equals("127.0.0.1"))); + } finally { + cluster.stop(); + Thread.sleep(20_000); // wait 2x the ZK timeout to ensure ZK entries removed + } + + // Set only the advertise address + restartClusterWithProperties( - Map.of(Property.RPC_PROCESS_ADVERTISE_ADDRESS.getKey(), "localhost"), false); ++ Map.of(Property.RPC_PROCESS_ADVERTISE_ADDRESS.getKey(), "localhost")); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(false); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.getHost().equals("localhost"))); + } finally { + cluster.stop(); + Thread.sleep(20_000); // wait 2x the ZK timeout to ensure ZK entries removed + } + + // Set advertise and bind address + restartClusterWithProperties(Map.of(Property.RPC_PROCESS_BIND_ADDRESS.getKey(), "127.0.0.1", - Property.RPC_PROCESS_ADVERTISE_ADDRESS.getKey(), "localhost"), false); ++ Property.RPC_PROCESS_ADVERTISE_ADDRESS.getKey(), "localhost")); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(false); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.getHost().equals("localhost"))); + } finally { + cluster.stop(); + } + + // Set advertise with port and bind address + // skip the coordinator because MiniAccumuloClusterControl.start will + // try to connect to it + restartClusterWithProperties(Map.of(Property.RPC_PROCESS_BIND_ADDRESS.getKey(), "127.0.0.1", - Property.RPC_PROCESS_ADVERTISE_ADDRESS.getKey(), "192.168.1.1:10005"), true); ++ Property.RPC_PROCESS_ADVERTISE_ADDRESS.getKey(), "192.168.1.1:10005")); + try { - Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(true); ++ Map<ServerType,HostAndPort> zkAddrs = getAdvertiseAddressFromZooKeeper(); + zkAddrs.values().forEach(hp -> assertTrue(hp.toString().equals("192.168.1.1:10005"))); + } finally { + cluster.stop(); + } + + } + - private void restartClusterWithArguments(String advertiseAddress, String bindAddress, - boolean skipCoordinator) throws Exception { ++ private void restartClusterWithArguments(String advertiseAddress, String bindAddress) ++ throws Exception { + List<String> args = new ArrayList<>(); + if (advertiseAddress != null) { + args.add("-o"); + args.add(Property.RPC_PROCESS_ADVERTISE_ADDRESS.getKey() + "=" + advertiseAddress); + } + if (bindAddress != null) { + args.add("-o"); + args.add(Property.RPC_PROCESS_BIND_ADDRESS.getKey() + "=" + bindAddress); + } + // cluster.start will not end up overwriting the accumulo.properties file + // with any property changes after the initial start. The only way to pass + // new or updated property settings on a process restart is to use the + // start method that takes configuration overrides. + MiniAccumuloClusterControl control = getCluster().getClusterControl(); + control.start(ServerType.ZOOKEEPER); - control.start(ServerType.TABLET_SERVER, Map.of(), 1, args.toArray(new String[] {})); - control.start(ServerType.MANAGER, Map.of(), 1, args.toArray(new String[] {})); - control.start(ServerType.GARBAGE_COLLECTOR, Map.of(), 1, args.toArray(new String[] {})); ++ control.start(ServerType.TABLET_SERVER, Map.of(), 1, null, args.toArray(new String[] {})); ++ control.start(ServerType.MANAGER, Map.of(), 1, null, args.toArray(new String[] {})); ++ control.start(ServerType.GARBAGE_COLLECTOR, Map.of(), 1, null, args.toArray(new String[] {})); ++ control.start(ServerType.COMPACTOR, Map.of(), 1, null, args.toArray(new String[] {})); ++ control.start(ServerType.SCAN_SERVER, Map.of(), 1, null, args.toArray(new String[] {})); + // Calling cluster.start here will set the Manager goal state + // and call verifyUp + cluster.start(); - if (!skipCoordinator) { - control.start(ServerType.COMPACTION_COORDINATOR, Map.of(), 1, args.toArray(new String[] {})); - } - List<String> compactorArgs = new ArrayList<>(args); - compactorArgs.add("-q"); - compactorArgs.add(QUEUE1); - control.start(ServerType.COMPACTOR, Map.of(), 1, compactorArgs.toArray(new String[] {})); - List<String> sserverArgs = new ArrayList<>(args); - sserverArgs.add("-g"); - sserverArgs.add("DEFAULT"); - control.start(ServerType.SCAN_SERVER, Map.of(), 1, sserverArgs.toArray(new String[] {})); - Wait.waitFor(() -> !getServerContext().instanceOperations().getCompactors().isEmpty()); - Wait.waitFor(() -> !getServerContext().instanceOperations().getScanServers().isEmpty()); + } + - private void restartClusterWithProperties(Map<String,String> properties, boolean skipCoordinator) - throws Exception { ++ private void restartClusterWithProperties(Map<String,String> properties) throws Exception { + // cluster.start will not end up overwriting the accumulo.properties file + // with any property changes after the initial start. The only way to pass + // new or updated property settings on a process restart is to use the + // start method that takes configuration overrides. + MiniAccumuloClusterControl control = getCluster().getClusterControl(); + control.start(ServerType.ZOOKEEPER); + control.start(ServerType.TABLET_SERVER, properties, 1); + control.start(ServerType.MANAGER, properties, 1); + control.start(ServerType.GARBAGE_COLLECTOR, properties, 1); ++ control.start(ServerType.COMPACTOR, properties, 1); ++ control.start(ServerType.SCAN_SERVER, properties, 1); + // Calling cluster.start here will set the Manager goal state + // and call verifyUp + cluster.start(); - if (!skipCoordinator) { - control.start(ServerType.COMPACTION_COORDINATOR, properties, 1); - } - control.start(ServerType.COMPACTOR, properties, 1, new String[] {"-q", QUEUE1}); - control.start(ServerType.SCAN_SERVER, properties, 1, new String[] {"-g", "DEFAULT"}); - Wait.waitFor(() -> !getServerContext().instanceOperations().getCompactors().isEmpty()); - Wait.waitFor(() -> !getServerContext().instanceOperations().getScanServers().isEmpty()); + } + - private Map<ServerType,HostAndPort> getAdvertiseAddressFromZooKeeper(boolean skipCoordinator) ++ private Map<ServerType,HostAndPort> getAdvertiseAddressFromZooKeeper() + throws InterruptedException { + Map<ServerType,HostAndPort> addresses = new HashMap<>(); + - List<String> mgrs = getServerContext().instanceOperations().getManagerLocations(); ++ Set<ServerId> mgrs = getServerContext().instanceOperations().getServers(ServerId.Type.MANAGER); + assertEquals(1, mgrs.size()); - addresses.put(ServerType.MANAGER, HostAndPort.fromString(mgrs.get((0)))); - - if (!skipCoordinator) { - Optional<HostAndPort> coordAddr = - ExternalCompactionUtil.findCompactionCoordinator(getServerContext()); - while (coordAddr.isEmpty()) { - Thread.sleep(50); - coordAddr = ExternalCompactionUtil.findCompactionCoordinator(getServerContext()); - } - addresses.put(ServerType.COMPACTION_COORDINATOR, coordAddr.orElseThrow()); - } ++ addresses.put(ServerType.MANAGER, ++ HostAndPort.fromString(mgrs.iterator().next().toHostPortString())); + - List<String> tservers = getServerContext().instanceOperations().getTabletServers(); ++ Set<ServerId> tservers = ++ getServerContext().instanceOperations().getServers(ServerId.Type.TABLET_SERVER); + assertEquals(1, tservers.size()); - addresses.put(ServerType.TABLET_SERVER, HostAndPort.fromString(tservers.get((0)))); ++ addresses.put(ServerType.TABLET_SERVER, ++ HostAndPort.fromString(tservers.iterator().next().toHostPortString())); + - Set<String> compactors = getServerContext().instanceOperations().getCompactors(); ++ Set<ServerId> compactors = ++ getServerContext().instanceOperations().getServers(ServerId.Type.COMPACTOR); + assertEquals(1, compactors.size()); - addresses.put(ServerType.COMPACTOR, HostAndPort.fromString(compactors.iterator().next())); ++ addresses.put(ServerType.COMPACTOR, ++ HostAndPort.fromString(compactors.iterator().next().toHostPortString())); + - Set<String> sservers = getServerContext().instanceOperations().getScanServers(); ++ Set<ServerId> sservers = ++ getServerContext().instanceOperations().getServers(ServerId.Type.SCAN_SERVER); + assertEquals(1, sservers.size()); - addresses.put(ServerType.SCAN_SERVER, HostAndPort.fromString(sservers.iterator().next())); ++ addresses.put(ServerType.SCAN_SERVER, ++ HostAndPort.fromString(sservers.iterator().next().toHostPortString())); + + return addresses; + } + + } diff --cc test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java index 7853f94b52,09de1b2fb2..0cf23c0759 --- a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java @@@ -149,12 -147,8 +149,12 @@@ public class ConfigurableMacBase extend lastException); } + public ClusterServerConfiguration getMiniClusterDescription() { + return new ClusterServerConfiguration(); + } + @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path provided by test") - private void createMiniAccumulo() throws Exception { + protected void createMiniAccumulo() throws Exception { // createTestDir will give us a empty directory, we don't need to clean it up ourselves File baseDir = createTestDir(this.getClass().getName() + "_" + this.testName()); MiniAccumuloConfigImpl cfg = new MiniAccumuloConfigImpl(baseDir, ROOT_PASSWORD);
