Repository: accumulo Updated Branches: refs/heads/master 32a39cb1d -> e900e6742
ACCUMULO-4424 Start thrift servers on master/monitor immediately Includes an integration test for master, monitor and gc. Adds anothers proxy around the Thrift processor to prevent calls from succeeding to a standby service. Closes apache/accumulo#150 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e900e674 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e900e674 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e900e674 Branch: refs/heads/master Commit: e900e67425d950bd4c0c5288a6270d7b362ac458 Parents: 32a39cb Author: Josh Elser <els...@apache.org> Authored: Mon Sep 12 22:29:37 2016 -0400 Committer: Josh Elser <els...@apache.org> Committed: Mon Sep 12 22:29:37 2016 -0400 ---------------------------------------------------------------------- .../impl/MiniAccumuloClusterImpl.java | 2 +- .../accumulo/server/HighlyAvailableService.java | 32 +++ ...HighlyAvailableServiceInvocationHandler.java | 48 ++++ .../rpc/HighlyAvailableServiceWrapper.java | 48 ++++ .../server/rpc/NotActiveServiceException.java | 26 ++ .../accumulo/gc/SimpleGarbageCollector.java | 3 + .../java/org/apache/accumulo/master/Master.java | 65 +++-- .../org/apache/accumulo/monitor/Monitor.java | 29 +- .../accumulo/monitor/servlets/BasicServlet.java | 16 ++ .../monitor/servlets/DefaultServlet.java | 6 + .../monitor/servlets/OperationServlet.java | 6 + .../accumulo/monitor/servlets/ShellServlet.java | 6 + .../ThriftServerBindsBeforeZooKeeperLockIT.java | 267 +++++++++++++++++++ 13 files changed, 522 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java index e769b30..b8a0f64 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java @@ -344,7 +344,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { return process; } - Process _exec(Class<?> clazz, ServerType serverType, Map<String,String> configOverrides, String... args) throws IOException { + public Process _exec(Class<?> clazz, ServerType serverType, Map<String,String> configOverrides, String... args) throws IOException { List<String> jvmOpts = new ArrayList<>(); jvmOpts.add("-Xmx" + config.getMemory(serverType)); if (configOverrides != null && !configOverrides.isEmpty()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java b/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java new file mode 100644 index 0000000..74fc2f4 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server; + +/** + * This interface allows service implementations which support running multiple instances concurrently with only one active instance to report whether or not + * they are the active service. + */ +public interface HighlyAvailableService { + + /** + * Is this service instance currently the active instance for the Accumulo cluster. + * + * @return True if the service is the active service, false otherwise. + */ + boolean isActiveService(); + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java new file mode 100644 index 0000000..2e8aa63 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.util.Objects; + +import org.apache.accumulo.server.HighlyAvailableService; + +/** + * An {@link InvocationHandler} which checks to see if a {@link HighlyAvailableService} is the current active instance of that service, throwing + * {@link NotActiveServiceException} when it is not the current active instance. + */ +public class HighlyAvailableServiceInvocationHandler<I> implements InvocationHandler { + + private final I instance; + private final HighlyAvailableService service; + + public HighlyAvailableServiceInvocationHandler(I instance, HighlyAvailableService service) { + this.instance = Objects.requireNonNull(instance); + this.service = Objects.requireNonNull(service); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + // If the service is not active, throw an exception + if (!service.isActiveService()) { + throw new NotActiveServiceException(); + } + // Otherwise, call the real method + return method.invoke(instance, args); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java new file mode 100644 index 0000000..89d7ed4 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; + +import org.apache.accumulo.server.HighlyAvailableService; + +/** + * A class to wrap invocations to the Thrift handler to prevent these invocations from succeeding when the Accumulo service that this Thrift service is for has + * not yet obtained its ZooKeeper lock. + * + * @since 2.0 + */ +public class HighlyAvailableServiceWrapper { + + private static final HighlyAvailableServiceWrapper INSTANCE = new HighlyAvailableServiceWrapper(); + + // Not for public use. + private HighlyAvailableServiceWrapper() {} + + public static <I> I service(final I instance, HighlyAvailableService service) { + InvocationHandler handler = INSTANCE.getInvocationHandler(instance, service); + + @SuppressWarnings("unchecked") + I proxiedInstance = (I) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); + return proxiedInstance; + } + + protected <T> HighlyAvailableServiceInvocationHandler<T> getInvocationHandler(final T instance, final HighlyAvailableService service) { + return new HighlyAvailableServiceInvocationHandler<T>(instance, service); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java new file mode 100644 index 0000000..6846b67 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +/** + * An {@link Exception} which denotes that the service which was invoked is not the active instance for that service in the Accumulo cluster. + */ +public class NotActiveServiceException extends Exception { + + private static final long serialVersionUID = 1L; + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index d9d0f2f..5805272 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -113,6 +113,9 @@ import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; import com.google.protobuf.InvalidProtocolBufferException; +// Could/Should implement HighlyAvaialbleService but the Thrift server is already started before +// the ZK lock is acquired. The server is only for metrics, there are no concerns about clients +// using the service before the lock is acquired. public class SimpleGarbageCollector extends AccumuloServerContext implements Iface { private static final Text EMPTY_TEXT = new Text(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index b7f6e1a..94f51cd 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -95,6 +95,7 @@ import org.apache.accumulo.master.replication.WorkDriver; import org.apache.accumulo.master.state.TableCounts; import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.HighlyAvailableService; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -126,6 +127,7 @@ import org.apache.accumulo.server.metrics.Metrics; import org.apache.accumulo.server.replication.ZooKeeperInitialization; import org.apache.accumulo.server.rpc.RpcWrapper; import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper; import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftServerType; @@ -169,7 +171,7 @@ import com.google.common.collect.Iterables; * * The master will also coordinate log recoveries and reports general status. */ -public class Master extends AccumuloServerContext implements LiveTServerSet.Listener, TableObserver, CurrentState { +public class Master extends AccumuloServerContext implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService { final static Logger log = LoggerFactory.getLogger(Master.class); @@ -1132,6 +1134,37 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List public void run() throws IOException, InterruptedException, KeeperException { final String zroot = ZooUtil.getRoot(getInstance()); + // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health when a hot-standby + // + // Start the Master's Client service + clientHandler = new MasterClientServiceHandler(this); + // Ensure that calls before the master gets the lock fail + Iface haProxy = HighlyAvailableServiceWrapper.service(clientHandler, this); + Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(haProxy)); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(), getConfiguration()); + processor = new Processor<>(tcredsProxy); + } else { + processor = new Processor<>(rpcProxy); + } + ServerAddress sa = TServerUtils.startServer(this, hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, + Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + clientService = sa.server; + log.info("Started Master client service at {}", sa.address); + + // Start the replication coordinator which assigns tservers to service replication requests + MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this); + ReplicationCoordinator.Iface haReplicationProxy = HighlyAvailableServiceWrapper.service(impl, this); + ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor = new ReplicationCoordinator.Processor<>(RpcWrapper.service( + impl, new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(haReplicationProxy))); + ServerAddress replAddress = TServerUtils.startServer(this, hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, + "Master Replication Coordinator", "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, + Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); + + log.info("Started replication coordinator service at " + replAddress.address); + + // block until we can obtain the ZK lock for the master getMasterLock(zroot + Constants.ZMASTER_LOCK); recoveryManager = new RecoveryManager(this); @@ -1235,18 +1268,6 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List log.info("AuthenticationTokenSecretManager is initialized"); } - clientHandler = new MasterClientServiceHandler(this); - Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(clientHandler)); - final Processor<Iface> processor; - if (ThriftServerType.SASL == getThriftServerType()) { - Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(), getConfiguration()); - processor = new Processor<>(tcredsProxy); - } else { - processor = new Processor<>(rpcProxy); - } - ServerAddress sa = TServerUtils.startServer(this, hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, - Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); - clientService = sa.server; String address = sa.address.toString(); log.info("Setting master lock data to " + address); masterLock.replaceLockData(address.getBytes()); @@ -1268,16 +1289,6 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List } replicationWorkAssigner.start(); - // Start the replication coordinator which assigns tservers to service replication requests - MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this); - ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor = new ReplicationCoordinator.Processor<>(RpcWrapper.service( - impl, new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl))); - ServerAddress replAddress = TServerUtils.startServer(this, hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, - "Master Replication Coordinator", "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, - Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); - - log.info("Started replication coordinator service at " + replAddress.address); - // Advertise that port we used so peers don't have to be told what it is ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, replAddress.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); @@ -1680,4 +1691,12 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List public Long getSteadyTime() { return timeKeeper.getTime(); } + + @Override + public boolean isActiveService() { + if (null != masterLock) { + return masterLock.isLocked(); + } + return false; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 42b9bd1..1e8feaa 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -81,6 +81,7 @@ import org.apache.accumulo.monitor.servlets.trace.ShowTrace; import org.apache.accumulo.monitor.servlets.trace.Summary; import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.HighlyAvailableService; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfigurationFactory; @@ -104,7 +105,7 @@ import com.google.common.net.HostAndPort; /** * Serve master statistics with an embedded web server. */ -public class Monitor { +public class Monitor implements HighlyAvailableService { private static final Logger log = LoggerFactory.getLogger(Monitor.class); private static final int REFRESH_TIME = 5; @@ -117,6 +118,7 @@ public class Monitor { private static long totalHoldTime = 0; private static long totalLookups = 0; private static int totalTables = 0; + public static HighlyAvailableService HA_SERVICE_INSTANCE = null; private static class MaxList<T> extends LinkedList<Pair<Long,T>> { private static final long serialVersionUID = 1L; @@ -436,6 +438,9 @@ public class Monitor { log.info("Instance " + instance.getInstanceID()); Accumulo.init(fs, config, app); Monitor monitor = new Monitor(); + // Servlets need access to limit requests when the monitor is not active, but Servlets are instantiated + // via reflection. Expose the service this way instead. + Monitor.HA_SERVICE_INSTANCE = monitor; DistributedTrace.enable(hostname, app, config.getConfiguration()); try { monitor.run(hostname); @@ -447,13 +452,6 @@ public class Monitor { private static long START_TIME; public void run(String hostname) { - try { - getMonitorLock(); - } catch (Exception e) { - log.error("Failed to get Monitor ZooKeeper lock"); - throw new RuntimeException(e); - } - Monitor.START_TIME = System.currentTimeMillis(); int ports[] = config.getConfiguration().getPort(Property.MONITOR_PORT); for (int port : ports) { @@ -490,6 +488,13 @@ public class Monitor { } try { + getMonitorLock(); + } catch (Exception e) { + log.error("Failed to get Monitor ZooKeeper lock"); + throw new RuntimeException(e); + } + + try { log.debug("Using " + hostname + " to advertise monitor location in ZooKeeper"); String monitorAddress = HostAndPort.fromParts(hostname, server.getPort()).toString(); @@ -837,4 +842,12 @@ public class Monitor { public static AccumuloServerContext getContext() { return context; } + + @Override + public boolean isActiveService() { + if (null != monitorLock && monitorLock.isLocked()) { + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java index fc329b8..6a8b82a 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java @@ -38,10 +38,12 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.server.monitor.DedupedLogEvent; import org.apache.accumulo.server.monitor.LogService; +import org.apache.commons.httpclient.util.HttpURLConnection; import org.apache.log4j.Level; import org.apache.log4j.Logger; abstract public class BasicServlet extends HttpServlet { + public static final String STANDBY_MONITOR_MESSAGE = "This is not the active Monitor"; private static final long serialVersionUID = 1L; protected static final Logger log = Logger.getLogger(BasicServlet.class); @@ -51,8 +53,22 @@ abstract public class BasicServlet extends HttpServlet { abstract protected String getTitle(HttpServletRequest req); + public boolean isActiveMonitor() { + // If the HighlyAvailableService is not initialized or it's not the active service, throw an exception + // to prevent processing of the servlet. + if (null == Monitor.HA_SERVICE_INSTANCE || !Monitor.HA_SERVICE_INSTANCE.isActiveService()) { + return false; + } + return true; + } + @Override public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + // Verify that this is the active Monitor instance + if (!isActiveMonitor()) { + resp.sendError(HttpURLConnection.HTTP_UNAVAILABLE, STANDBY_MONITOR_MESSAGE); + return; + } StringBuilder sb = new StringBuilder(); try { Monitor.fetchData(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java index 383d7bc..85c60fb 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java @@ -39,6 +39,7 @@ import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.ZooKeeperStatus; import org.apache.accumulo.monitor.ZooKeeperStatus.ZooKeeperState; import org.apache.accumulo.monitor.util.celltypes.NumberType; +import org.apache.commons.httpclient.util.HttpURLConnection; public class DefaultServlet extends BasicServlet { @@ -83,6 +84,11 @@ public class DefaultServlet extends BasicServlet { @Override public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + // Verify that this is the active Monitor instance + if (!isActiveMonitor()) { + resp.sendError(HttpURLConnection.HTTP_UNAVAILABLE, STANDBY_MONITOR_MESSAGE); + return; + } if (req.getRequestURI().startsWith("/web")) getResource(req, resp); else if (req.getRequestURI().startsWith("/monitor")) http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java index a3dbe8c..412ae2e 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java @@ -31,6 +31,7 @@ import org.apache.accumulo.server.master.state.DeadServerList; import org.apache.accumulo.server.monitor.LogService; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; +import org.apache.commons.httpclient.util.HttpURLConnection; import org.apache.log4j.Logger; public class OperationServlet extends BasicServlet { @@ -44,6 +45,11 @@ public class OperationServlet extends BasicServlet { @Override public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { + // Verify that this is the active Monitor instance + if (!isActiveMonitor()) { + resp.sendError(HttpURLConnection.HTTP_UNAVAILABLE, STANDBY_MONITOR_MESSAGE); + return; + } String redir = null; List<Cookie> cookiesToSet = Collections.emptyList(); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java index 31bea15..1dc8422 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java @@ -33,6 +33,7 @@ import javax.servlet.http.HttpSession; import jline.console.ConsoleReader; import org.apache.accumulo.shell.Shell; +import org.apache.commons.httpclient.util.HttpURLConnection; public class ShellServlet extends BasicServlet { private static final long serialVersionUID = 1L; @@ -179,6 +180,11 @@ public class ShellServlet extends BasicServlet { @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + // Verify that this is the active Monitor instance + if (!isActiveMonitor()) { + resp.sendError(HttpURLConnection.HTTP_UNAVAILABLE, STANDBY_MONITOR_MESSAGE); + return; + } final HttpSession session = req.getSession(true); String user = (String) session.getAttribute("user"); if (user == null || !userShells().containsKey(session.getId())) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java b/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java new file mode 100644 index 0000000..2890fc7 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.Socket; +import java.net.URL; +import java.util.Collection; +import java.util.List; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.MonitorUtil; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.monitor.servlets.BasicServlet; +import org.apache.accumulo.server.util.PortUtils; +import org.apache.accumulo.test.categories.MiniClusterOnlyTests; +import org.apache.accumulo.test.functional.FunctionalTestUtils; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; + +/** + * Test class that verifies "HA-capable" servers put up their thrift servers before acquiring their ZK lock. + */ +@Category({MiniClusterOnlyTests.class}) +public class ThriftServerBindsBeforeZooKeeperLockIT extends AccumuloClusterHarness { + private static final Logger LOG = LoggerFactory.getLogger(ThriftServerBindsBeforeZooKeeperLockIT.class); + + public boolean canRunTest(ClusterType type) { + return ClusterType.MINI == type; + } + + @Test + public void testMonitorService() throws Exception { + final MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) getCluster(); + Collection<ProcessReference> monitors = cluster.getProcesses().get(ServerType.MONITOR); + // Need to start one monitor and let it become active. + if (null == monitors || 0 == monitors.size()) { + getClusterControl().start(ServerType.MONITOR, "localhost"); + } + + final ZooKeeperInstance inst = new ZooKeeperInstance(cluster.getClientConfig()); + while (true) { + try { + MonitorUtil.getLocation(inst); + break; + } catch (Exception e) { + LOG.debug("Failed to find active monitor location, retrying", e); + Thread.sleep(1000); + } + } + + LOG.debug("Found active monitor"); + + while (true) { + int freePort = PortUtils.getRandomFreePort(); + String monitorUrl = "http://localhost:" + freePort; + Process monitor = null; + try { + LOG.debug("Starting standby monitor on {}", freePort); + monitor = startProcess(cluster, ServerType.MONITOR, freePort); + + while (true) { + URL url = new URL(monitorUrl); + try { + HttpURLConnection cnxn = (HttpURLConnection) url.openConnection(); + final int responseCode = cnxn.getResponseCode(); + final String errorText = FunctionalTestUtils.readAll(cnxn.getErrorStream()); + // This is our "assertion", but we want to re-check it if it's not what we expect + if (HttpURLConnection.HTTP_UNAVAILABLE == responseCode && null != errorText && errorText.contains(BasicServlet.STANDBY_MONITOR_MESSAGE)) { + return; + } + LOG.debug("Unexpected responseCode and/or error text, will retry: '{}' '{}'", responseCode, errorText); + } catch (Exception e) { + LOG.debug("Caught exception trying to fetch monitor info", e); + } + // Wait before trying again + Thread.sleep(1000); + // Make sure the process is still up. Possible the "randomFreePort" we got wasn't actually free and the process + // died trying to bind it. Pick a new port and restart it in that case. + if (!monitor.isAlive()) { + freePort = PortUtils.getRandomFreePort(); + monitorUrl = "http://localhost:" + freePort; + LOG.debug("Monitor died, restarting it listening on {}", freePort); + monitor = startProcess(cluster, ServerType.MONITOR, freePort); + } + } + } finally { + if (null != monitor) { + monitor.destroyForcibly(); + } + } + } + } + + @Test + public void testMasterService() throws Exception { + final MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) getCluster(); + final ZooKeeperInstance inst = new ZooKeeperInstance(cluster.getClientConfig()); + + // Wait for the Master to grab its lock + while (true) { + final ZooReader reader = new ZooReader(inst.getZooKeepers(), 30000); + try { + List<String> locks = reader.getChildren(Constants.ZROOT + "/" + inst.getInstanceID() + Constants.ZMASTER_LOCK); + if (locks.size() > 0) { + break; + } + } catch (Exception e) { + LOG.debug("Failed to find active master location, retrying", e); + Thread.sleep(1000); + } + } + + LOG.debug("Found active master"); + + while (true) { + int freePort = PortUtils.getRandomFreePort(); + Process master = null; + try { + LOG.debug("Starting standby master on {}", freePort); + master = startProcess(cluster, ServerType.MASTER, freePort); + + while (true) { + Socket s = null; + try { + s = new Socket("localhost", freePort); + if (s.isConnected()) { + // Pass + return; + } + } catch (Exception e) { + LOG.debug("Caught exception trying to connect to Master", e); + } finally { + if (null != s) { + s.close(); + } + } + // Wait before trying again + Thread.sleep(1000); + // Make sure the process is still up. Possible the "randomFreePort" we got wasn't actually free and the process + // died trying to bind it. Pick a new port and restart it in that case. + if (!master.isAlive()) { + freePort = PortUtils.getRandomFreePort(); + LOG.debug("Master died, restarting it listening on {}", freePort); + master = startProcess(cluster, ServerType.MASTER, freePort); + } + } + } finally { + if (null != master) { + master.destroyForcibly(); + } + } + } + } + + @Test + public void testGarbageCollectorPorts() throws Exception { + final MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) getCluster(); + final ZooKeeperInstance inst = new ZooKeeperInstance(cluster.getClientConfig()); + + // Wait for the Master to grab its lock + while (true) { + final ZooReader reader = new ZooReader(inst.getZooKeepers(), 30000); + try { + List<String> locks = reader.getChildren(Constants.ZROOT + "/" + inst.getInstanceID() + Constants.ZGC_LOCK); + if (locks.size() > 0) { + break; + } + } catch (Exception e) { + LOG.debug("Failed to find active gc location, retrying", e); + Thread.sleep(1000); + } + } + + LOG.debug("Found active gc"); + + while (true) { + int freePort = PortUtils.getRandomFreePort(); + Process master = null; + try { + LOG.debug("Starting standby gc on {}", freePort); + master = startProcess(cluster, ServerType.GARBAGE_COLLECTOR, freePort); + + while (true) { + Socket s = null; + try { + s = new Socket("localhost", freePort); + if (s.isConnected()) { + // Pass + return; + } + } catch (Exception e) { + LOG.debug("Caught exception trying to connect to GC", e); + } finally { + if (null != s) { + s.close(); + } + } + // Wait before trying again + Thread.sleep(1000); + // Make sure the process is still up. Possible the "randomFreePort" we got wasn't actually free and the process + // died trying to bind it. Pick a new port and restart it in that case. + if (!master.isAlive()) { + freePort = PortUtils.getRandomFreePort(); + LOG.debug("GC died, restarting it listening on {}", freePort); + master = startProcess(cluster, ServerType.GARBAGE_COLLECTOR, freePort); + } + } + } finally { + if (null != master) { + master.destroyForcibly(); + } + } + } + } + + private Process startProcess(MiniAccumuloClusterImpl cluster, ServerType serverType, int port) throws IOException { + final Property property; + final Class<?> service; + switch (serverType) { + case MONITOR: + property = Property.MONITOR_PORT; + service = Monitor.class; + break; + case MASTER: + property = Property.MASTER_CLIENTPORT; + service = Master.class; + break; + case GARBAGE_COLLECTOR: + property = Property.GC_PORT; + service = SimpleGarbageCollector.class; + break; + default: + throw new IllegalArgumentException("Irrelevant server type for test"); + } + + return cluster._exec(service, serverType, ImmutableMap.of(property.getKey(), Integer.toString(port))); + } +}