Repository: accumulo
Updated Branches:
  refs/heads/master 32a39cb1d -> e900e6742


ACCUMULO-4424 Start thrift servers on master/monitor immediately

Includes an integration test for master, monitor and gc. Adds anothers
proxy around the Thrift processor to prevent calls from succeeding
to a standby service.

Closes apache/accumulo#150


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e900e674
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e900e674
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e900e674

Branch: refs/heads/master
Commit: e900e67425d950bd4c0c5288a6270d7b362ac458
Parents: 32a39cb
Author: Josh Elser <els...@apache.org>
Authored: Mon Sep 12 22:29:37 2016 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Mon Sep 12 22:29:37 2016 -0400

----------------------------------------------------------------------
 .../impl/MiniAccumuloClusterImpl.java           |   2 +-
 .../accumulo/server/HighlyAvailableService.java |  32 +++
 ...HighlyAvailableServiceInvocationHandler.java |  48 ++++
 .../rpc/HighlyAvailableServiceWrapper.java      |  48 ++++
 .../server/rpc/NotActiveServiceException.java   |  26 ++
 .../accumulo/gc/SimpleGarbageCollector.java     |   3 +
 .../java/org/apache/accumulo/master/Master.java |  65 +++--
 .../org/apache/accumulo/monitor/Monitor.java    |  29 +-
 .../accumulo/monitor/servlets/BasicServlet.java |  16 ++
 .../monitor/servlets/DefaultServlet.java        |   6 +
 .../monitor/servlets/OperationServlet.java      |   6 +
 .../accumulo/monitor/servlets/ShellServlet.java |   6 +
 .../ThriftServerBindsBeforeZooKeeperLockIT.java | 267 +++++++++++++++++++
 13 files changed, 522 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
index e769b30..b8a0f64 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
@@ -344,7 +344,7 @@ public class MiniAccumuloClusterImpl implements 
AccumuloCluster {
     return process;
   }
 
-  Process _exec(Class<?> clazz, ServerType serverType, Map<String,String> 
configOverrides, String... args) throws IOException {
+  public Process _exec(Class<?> clazz, ServerType serverType, 
Map<String,String> configOverrides, String... args) throws IOException {
     List<String> jvmOpts = new ArrayList<>();
     jvmOpts.add("-Xmx" + config.getMemory(serverType));
     if (configOverrides != null && !configOverrides.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java
 
b/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java
new file mode 100644
index 0000000..74fc2f4
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/HighlyAvailableService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+/**
+ * This interface allows service implementations which support running 
multiple instances concurrently with only one active instance to report whether 
or not
+ * they are the active service.
+ */
+public interface HighlyAvailableService {
+
+  /**
+   * Is this service instance currently the active instance for the Accumulo 
cluster.
+   *
+   * @return True if the service is the active service, false otherwise.
+   */
+  boolean isActiveService();
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
new file mode 100644
index 0000000..2e8aa63
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceInvocationHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.Objects;
+
+import org.apache.accumulo.server.HighlyAvailableService;
+
+/**
+ * An {@link InvocationHandler} which checks to see if a {@link 
HighlyAvailableService} is the current active instance of that service, throwing
+ * {@link NotActiveServiceException} when it is not the current active 
instance.
+ */
+public class HighlyAvailableServiceInvocationHandler<I> implements 
InvocationHandler {
+
+  private final I instance;
+  private final HighlyAvailableService service;
+
+  public HighlyAvailableServiceInvocationHandler(I instance, 
HighlyAvailableService service) {
+    this.instance = Objects.requireNonNull(instance);
+    this.service = Objects.requireNonNull(service);
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+    // If the service is not active, throw an exception
+    if (!service.isActiveService()) {
+      throw new NotActiveServiceException();
+    }
+    // Otherwise, call the real method
+    return method.invoke(instance, args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java
new file mode 100644
index 0000000..89d7ed4
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/HighlyAvailableServiceWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+
+import org.apache.accumulo.server.HighlyAvailableService;
+
+/**
+ * A class to wrap invocations to the Thrift handler to prevent these 
invocations from succeeding when the Accumulo service that this Thrift service 
is for has
+ * not yet obtained its ZooKeeper lock.
+ *
+ * @since 2.0
+ */
+public class HighlyAvailableServiceWrapper {
+
+  private static final HighlyAvailableServiceWrapper INSTANCE = new 
HighlyAvailableServiceWrapper();
+
+  // Not for public use.
+  private HighlyAvailableServiceWrapper() {}
+
+  public static <I> I service(final I instance, HighlyAvailableService 
service) {
+    InvocationHandler handler = INSTANCE.getInvocationHandler(instance, 
service);
+
+    @SuppressWarnings("unchecked")
+    I proxiedInstance = (I) 
Proxy.newProxyInstance(instance.getClass().getClassLoader(), 
instance.getClass().getInterfaces(), handler);
+    return proxiedInstance;
+  }
+
+  protected <T> HighlyAvailableServiceInvocationHandler<T> 
getInvocationHandler(final T instance, final HighlyAvailableService service) {
+    return new HighlyAvailableServiceInvocationHandler<T>(instance, service);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java
new file mode 100644
index 0000000..6846b67
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/NotActiveServiceException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+/**
+ * An {@link Exception} which denotes that the service which was invoked is 
not the active instance for that service in the Accumulo cluster.
+ */
+public class NotActiveServiceException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index d9d0f2f..5805272 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -113,6 +113,9 @@ import com.google.common.collect.Maps;
 import com.google.common.net.HostAndPort;
 import com.google.protobuf.InvalidProtocolBufferException;
 
+// Could/Should implement HighlyAvaialbleService but the Thrift server is 
already started before
+// the ZK lock is acquired. The server is only for metrics, there are no 
concerns about clients
+// using the service before the lock is acquired.
 public class SimpleGarbageCollector extends AccumuloServerContext implements 
Iface {
   private static final Text EMPTY_TEXT = new Text();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index b7f6e1a..94f51cd 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -95,6 +95,7 @@ import org.apache.accumulo.master.replication.WorkDriver;
 import org.apache.accumulo.master.state.TableCounts;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.HighlyAvailableService;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -126,6 +127,7 @@ import org.apache.accumulo.server.metrics.Metrics;
 import org.apache.accumulo.server.replication.ZooKeeperInitialization;
 import org.apache.accumulo.server.rpc.RpcWrapper;
 import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper;
 import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftServerType;
@@ -169,7 +171,7 @@ import com.google.common.collect.Iterables;
  *
  * The master will also coordinate log recoveries and reports general status.
  */
-public class Master extends AccumuloServerContext implements 
LiveTServerSet.Listener, TableObserver, CurrentState {
+public class Master extends AccumuloServerContext implements 
LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService {
 
   final static Logger log = LoggerFactory.getLogger(Master.class);
 
@@ -1132,6 +1134,37 @@ public class Master extends AccumuloServerContext 
implements LiveTServerSet.List
   public void run() throws IOException, InterruptedException, KeeperException {
     final String zroot = ZooUtil.getRoot(getInstance());
 
+    // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a 
sign of process health when a hot-standby
+    //
+    // Start the Master's Client service
+    clientHandler = new MasterClientServiceHandler(this);
+    // Ensure that calls before the master gets the lock fail
+    Iface haProxy = HighlyAvailableServiceWrapper.service(clientHandler, this);
+    Iface rpcProxy = RpcWrapper.service(clientHandler, new 
Processor<Iface>(haProxy));
+    final Processor<Iface> processor;
+    if (ThriftServerType.SASL == getThriftServerType()) {
+      Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, 
clientHandler.getClass(), getConfiguration());
+      processor = new Processor<>(tcredsProxy);
+    } else {
+      processor = new Processor<>(rpcProxy);
+    }
+    ServerAddress sa = TServerUtils.startServer(this, hostname, 
Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service 
Handler", null,
+        Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
+    clientService = sa.server;
+    log.info("Started Master client service at {}", sa.address);
+
+    // Start the replication coordinator which assigns tservers to service 
replication requests
+    MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this);
+    ReplicationCoordinator.Iface haReplicationProxy = 
HighlyAvailableServiceWrapper.service(impl, this);
+    ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> 
replicationCoordinatorProcessor = new 
ReplicationCoordinator.Processor<>(RpcWrapper.service(
+        impl, new 
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(haReplicationProxy)));
+    ServerAddress replAddress = TServerUtils.startServer(this, hostname, 
Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor,
+        "Master Replication Coordinator", "Replication Coordinator", null, 
Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
+        Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
+
+    log.info("Started replication coordinator service at " + 
replAddress.address);
+
+    // block until we can obtain the ZK lock for the master
     getMasterLock(zroot + Constants.ZMASTER_LOCK);
 
     recoveryManager = new RecoveryManager(this);
@@ -1235,18 +1268,6 @@ public class Master extends AccumuloServerContext 
implements LiveTServerSet.List
       log.info("AuthenticationTokenSecretManager is initialized");
     }
 
-    clientHandler = new MasterClientServiceHandler(this);
-    Iface rpcProxy = RpcWrapper.service(clientHandler, new 
Processor<Iface>(clientHandler));
-    final Processor<Iface> processor;
-    if (ThriftServerType.SASL == getThriftServerType()) {
-      Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, 
clientHandler.getClass(), getConfiguration());
-      processor = new Processor<>(tcredsProxy);
-    } else {
-      processor = new Processor<>(rpcProxy);
-    }
-    ServerAddress sa = TServerUtils.startServer(this, hostname, 
Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service 
Handler", null,
-        Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
-    clientService = sa.server;
     String address = sa.address.toString();
     log.info("Setting master lock data to " + address);
     masterLock.replaceLockData(address.getBytes());
@@ -1268,16 +1289,6 @@ public class Master extends AccumuloServerContext 
implements LiveTServerSet.List
     }
     replicationWorkAssigner.start();
 
-    // Start the replication coordinator which assigns tservers to service 
replication requests
-    MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this);
-    ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> 
replicationCoordinatorProcessor = new 
ReplicationCoordinator.Processor<>(RpcWrapper.service(
-        impl, new 
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl)));
-    ServerAddress replAddress = TServerUtils.startServer(this, hostname, 
Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor,
-        "Master Replication Coordinator", "Replication Coordinator", null, 
Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
-        Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, 
Property.GENERAL_MAX_MESSAGE_SIZE);
-
-    log.info("Started replication coordinator service at " + 
replAddress.address);
-
     // Advertise that port we used so peers don't have to be told what it is
     
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) 
+ Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
         replAddress.address.toString().getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
@@ -1680,4 +1691,12 @@ public class Master extends AccumuloServerContext 
implements LiveTServerSet.List
   public Long getSteadyTime() {
     return timeKeeper.getTime();
   }
+
+  @Override
+  public boolean isActiveService() {
+    if (null != masterLock) {
+      return masterLock.isLocked();
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
----------------------------------------------------------------------
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 42b9bd1..1e8feaa 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -81,6 +81,7 @@ import org.apache.accumulo.monitor.servlets.trace.ShowTrace;
 import org.apache.accumulo.monitor.servlets.trace.Summary;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.HighlyAvailableService;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -104,7 +105,7 @@ import com.google.common.net.HostAndPort;
 /**
  * Serve master statistics with an embedded web server.
  */
-public class Monitor {
+public class Monitor implements HighlyAvailableService {
   private static final Logger log = LoggerFactory.getLogger(Monitor.class);
 
   private static final int REFRESH_TIME = 5;
@@ -117,6 +118,7 @@ public class Monitor {
   private static long totalHoldTime = 0;
   private static long totalLookups = 0;
   private static int totalTables = 0;
+  public static HighlyAvailableService HA_SERVICE_INSTANCE = null;
 
   private static class MaxList<T> extends LinkedList<Pair<Long,T>> {
     private static final long serialVersionUID = 1L;
@@ -436,6 +438,9 @@ public class Monitor {
     log.info("Instance " + instance.getInstanceID());
     Accumulo.init(fs, config, app);
     Monitor monitor = new Monitor();
+    // Servlets need access to limit requests when the monitor is not active, 
but Servlets are instantiated
+    // via reflection. Expose the service this way instead.
+    Monitor.HA_SERVICE_INSTANCE = monitor;
     DistributedTrace.enable(hostname, app, config.getConfiguration());
     try {
       monitor.run(hostname);
@@ -447,13 +452,6 @@ public class Monitor {
   private static long START_TIME;
 
   public void run(String hostname) {
-    try {
-      getMonitorLock();
-    } catch (Exception e) {
-      log.error("Failed to get Monitor ZooKeeper lock");
-      throw new RuntimeException(e);
-    }
-
     Monitor.START_TIME = System.currentTimeMillis();
     int ports[] = config.getConfiguration().getPort(Property.MONITOR_PORT);
     for (int port : ports) {
@@ -490,6 +488,13 @@ public class Monitor {
     }
 
     try {
+      getMonitorLock();
+    } catch (Exception e) {
+      log.error("Failed to get Monitor ZooKeeper lock");
+      throw new RuntimeException(e);
+    }
+
+    try {
       log.debug("Using " + hostname + " to advertise monitor location in 
ZooKeeper");
 
       String monitorAddress = HostAndPort.fromParts(hostname, 
server.getPort()).toString();
@@ -837,4 +842,12 @@ public class Monitor {
   public static AccumuloServerContext getContext() {
     return context;
   }
+
+  @Override
+  public boolean isActiveService() {
+    if (null != monitorLock && monitorLock.isLocked()) {
+      return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
----------------------------------------------------------------------
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
index fc329b8..6a8b82a 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java
@@ -38,10 +38,12 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.server.monitor.DedupedLogEvent;
 import org.apache.accumulo.server.monitor.LogService;
+import org.apache.commons.httpclient.util.HttpURLConnection;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 abstract public class BasicServlet extends HttpServlet {
+  public static final String STANDBY_MONITOR_MESSAGE = "This is not the active 
Monitor";
 
   private static final long serialVersionUID = 1L;
   protected static final Logger log = Logger.getLogger(BasicServlet.class);
@@ -51,8 +53,22 @@ abstract public class BasicServlet extends HttpServlet {
 
   abstract protected String getTitle(HttpServletRequest req);
 
+  public boolean isActiveMonitor() {
+    // If the HighlyAvailableService is not initialized or it's not the active 
service, throw an exception
+    // to prevent processing of the servlet.
+    if (null == Monitor.HA_SERVICE_INSTANCE || 
!Monitor.HA_SERVICE_INSTANCE.isActiveService()) {
+      return false;
+    }
+    return true;
+  }
+
   @Override
   public void doGet(HttpServletRequest req, HttpServletResponse resp) throws 
ServletException, IOException {
+    // Verify that this is the active Monitor instance
+    if (!isActiveMonitor()) {
+      resp.sendError(HttpURLConnection.HTTP_UNAVAILABLE, 
STANDBY_MONITOR_MESSAGE);
+      return;
+    }
     StringBuilder sb = new StringBuilder();
     try {
       Monitor.fetchData();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java
----------------------------------------------------------------------
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java
index 383d7bc..85c60fb 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/DefaultServlet.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.monitor.ZooKeeperStatus;
 import org.apache.accumulo.monitor.ZooKeeperStatus.ZooKeeperState;
 import org.apache.accumulo.monitor.util.celltypes.NumberType;
+import org.apache.commons.httpclient.util.HttpURLConnection;
 
 public class DefaultServlet extends BasicServlet {
 
@@ -83,6 +84,11 @@ public class DefaultServlet extends BasicServlet {
 
   @Override
   public void doGet(HttpServletRequest req, HttpServletResponse resp) throws 
ServletException, IOException {
+    // Verify that this is the active Monitor instance
+    if (!isActiveMonitor()) {
+      resp.sendError(HttpURLConnection.HTTP_UNAVAILABLE, 
STANDBY_MONITOR_MESSAGE);
+      return;
+    }
     if (req.getRequestURI().startsWith("/web"))
       getResource(req, resp);
     else if (req.getRequestURI().startsWith("/monitor"))

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java
----------------------------------------------------------------------
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java
index a3dbe8c..412ae2e 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/OperationServlet.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.server.master.state.DeadServerList;
 import org.apache.accumulo.server.monitor.LogService;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.commons.httpclient.util.HttpURLConnection;
 import org.apache.log4j.Logger;
 
 public class OperationServlet extends BasicServlet {
@@ -44,6 +45,11 @@ public class OperationServlet extends BasicServlet {
 
   @Override
   public void doGet(HttpServletRequest req, HttpServletResponse resp) throws 
IOException {
+    // Verify that this is the active Monitor instance
+    if (!isActiveMonitor()) {
+      resp.sendError(HttpURLConnection.HTTP_UNAVAILABLE, 
STANDBY_MONITOR_MESSAGE);
+      return;
+    }
     String redir = null;
     List<Cookie> cookiesToSet = Collections.emptyList();
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java
----------------------------------------------------------------------
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java
index 31bea15..1dc8422 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ShellServlet.java
@@ -33,6 +33,7 @@ import javax.servlet.http.HttpSession;
 import jline.console.ConsoleReader;
 
 import org.apache.accumulo.shell.Shell;
+import org.apache.commons.httpclient.util.HttpURLConnection;
 
 public class ShellServlet extends BasicServlet {
   private static final long serialVersionUID = 1L;
@@ -179,6 +180,11 @@ public class ShellServlet extends BasicServlet {
 
   @Override
   protected void doPost(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
+    // Verify that this is the active Monitor instance
+    if (!isActiveMonitor()) {
+      resp.sendError(HttpURLConnection.HTTP_UNAVAILABLE, 
STANDBY_MONITOR_MESSAGE);
+      return;
+    }
     final HttpSession session = req.getSession(true);
     String user = (String) session.getAttribute("user");
     if (user == null || !userShells().containsKey(session.getId())) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e900e674/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java
 
b/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java
new file mode 100644
index 0000000..2890fc7
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.Socket;
+import java.net.URL;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.MonitorUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.monitor.Monitor;
+import org.apache.accumulo.monitor.servlets.BasicServlet;
+import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.test.categories.MiniClusterOnlyTests;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Test class that verifies "HA-capable" servers put up their thrift servers 
before acquiring their ZK lock.
+ */
+@Category({MiniClusterOnlyTests.class})
+public class ThriftServerBindsBeforeZooKeeperLockIT extends 
AccumuloClusterHarness {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ThriftServerBindsBeforeZooKeeperLockIT.class);
+
+  public boolean canRunTest(ClusterType type) {
+    return ClusterType.MINI == type;
+  }
+
+  @Test
+  public void testMonitorService() throws Exception {
+    final MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) 
getCluster();
+    Collection<ProcessReference> monitors = 
cluster.getProcesses().get(ServerType.MONITOR);
+    // Need to start one monitor and let it become active.
+    if (null == monitors || 0 == monitors.size()) {
+      getClusterControl().start(ServerType.MONITOR, "localhost");
+    }
+
+    final ZooKeeperInstance inst = new 
ZooKeeperInstance(cluster.getClientConfig());
+    while (true) {
+      try {
+        MonitorUtil.getLocation(inst);
+        break;
+      } catch (Exception e) {
+        LOG.debug("Failed to find active monitor location, retrying", e);
+        Thread.sleep(1000);
+      }
+    }
+
+    LOG.debug("Found active monitor");
+
+    while (true) {
+      int freePort = PortUtils.getRandomFreePort();
+      String monitorUrl = "http://localhost:"; + freePort;
+      Process monitor = null;
+      try {
+        LOG.debug("Starting standby monitor on {}", freePort);
+        monitor = startProcess(cluster, ServerType.MONITOR, freePort);
+
+        while (true) {
+          URL url = new URL(monitorUrl);
+          try {
+            HttpURLConnection cnxn = (HttpURLConnection) url.openConnection();
+            final int responseCode = cnxn.getResponseCode();
+            final String errorText = 
FunctionalTestUtils.readAll(cnxn.getErrorStream());
+            // This is our "assertion", but we want to re-check it if it's not 
what we expect
+            if (HttpURLConnection.HTTP_UNAVAILABLE == responseCode && null != 
errorText && errorText.contains(BasicServlet.STANDBY_MONITOR_MESSAGE)) {
+              return;
+            }
+            LOG.debug("Unexpected responseCode and/or error text, will retry: 
'{}' '{}'", responseCode, errorText);
+          } catch (Exception e) {
+            LOG.debug("Caught exception trying to fetch monitor info", e);
+          }
+          // Wait before trying again
+          Thread.sleep(1000);
+          // Make sure the process is still up. Possible the "randomFreePort" 
we got wasn't actually free and the process
+          // died trying to bind it. Pick a new port and restart it in that 
case.
+          if (!monitor.isAlive()) {
+            freePort = PortUtils.getRandomFreePort();
+            monitorUrl = "http://localhost:"; + freePort;
+            LOG.debug("Monitor died, restarting it listening on {}", freePort);
+            monitor = startProcess(cluster, ServerType.MONITOR, freePort);
+          }
+        }
+      } finally {
+        if (null != monitor) {
+          monitor.destroyForcibly();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testMasterService() throws Exception {
+    final MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) 
getCluster();
+    final ZooKeeperInstance inst = new 
ZooKeeperInstance(cluster.getClientConfig());
+
+    // Wait for the Master to grab its lock
+    while (true) {
+      final ZooReader reader = new ZooReader(inst.getZooKeepers(), 30000);
+      try {
+        List<String> locks = reader.getChildren(Constants.ZROOT + "/" + 
inst.getInstanceID() + Constants.ZMASTER_LOCK);
+        if (locks.size() > 0) {
+          break;
+        }
+      } catch (Exception e) {
+        LOG.debug("Failed to find active master location, retrying", e);
+        Thread.sleep(1000);
+      }
+    }
+
+    LOG.debug("Found active master");
+
+    while (true) {
+      int freePort = PortUtils.getRandomFreePort();
+      Process master = null;
+      try {
+        LOG.debug("Starting standby master on {}", freePort);
+        master = startProcess(cluster, ServerType.MASTER, freePort);
+
+        while (true) {
+          Socket s = null;
+          try {
+            s = new Socket("localhost", freePort);
+            if (s.isConnected()) {
+              // Pass
+              return;
+            }
+          } catch (Exception e) {
+            LOG.debug("Caught exception trying to connect to Master", e);
+          } finally {
+            if (null != s) {
+              s.close();
+            }
+          }
+          // Wait before trying again
+          Thread.sleep(1000);
+          // Make sure the process is still up. Possible the "randomFreePort" 
we got wasn't actually free and the process
+          // died trying to bind it. Pick a new port and restart it in that 
case.
+          if (!master.isAlive()) {
+            freePort = PortUtils.getRandomFreePort();
+            LOG.debug("Master died, restarting it listening on {}", freePort);
+            master = startProcess(cluster, ServerType.MASTER, freePort);
+          }
+        }
+      } finally {
+        if (null != master) {
+          master.destroyForcibly();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testGarbageCollectorPorts() throws Exception {
+    final MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) 
getCluster();
+    final ZooKeeperInstance inst = new 
ZooKeeperInstance(cluster.getClientConfig());
+
+    // Wait for the Master to grab its lock
+    while (true) {
+      final ZooReader reader = new ZooReader(inst.getZooKeepers(), 30000);
+      try {
+        List<String> locks = reader.getChildren(Constants.ZROOT + "/" + 
inst.getInstanceID() + Constants.ZGC_LOCK);
+        if (locks.size() > 0) {
+          break;
+        }
+      } catch (Exception e) {
+        LOG.debug("Failed to find active gc location, retrying", e);
+        Thread.sleep(1000);
+      }
+    }
+
+    LOG.debug("Found active gc");
+
+    while (true) {
+      int freePort = PortUtils.getRandomFreePort();
+      Process master = null;
+      try {
+        LOG.debug("Starting standby gc on {}", freePort);
+        master = startProcess(cluster, ServerType.GARBAGE_COLLECTOR, freePort);
+
+        while (true) {
+          Socket s = null;
+          try {
+            s = new Socket("localhost", freePort);
+            if (s.isConnected()) {
+              // Pass
+              return;
+            }
+          } catch (Exception e) {
+            LOG.debug("Caught exception trying to connect to GC", e);
+          } finally {
+            if (null != s) {
+              s.close();
+            }
+          }
+          // Wait before trying again
+          Thread.sleep(1000);
+          // Make sure the process is still up. Possible the "randomFreePort" 
we got wasn't actually free and the process
+          // died trying to bind it. Pick a new port and restart it in that 
case.
+          if (!master.isAlive()) {
+            freePort = PortUtils.getRandomFreePort();
+            LOG.debug("GC died, restarting it listening on {}", freePort);
+            master = startProcess(cluster, ServerType.GARBAGE_COLLECTOR, 
freePort);
+          }
+        }
+      } finally {
+        if (null != master) {
+          master.destroyForcibly();
+        }
+      }
+    }
+  }
+
+  private Process startProcess(MiniAccumuloClusterImpl cluster, ServerType 
serverType, int port) throws IOException {
+    final Property property;
+    final Class<?> service;
+    switch (serverType) {
+      case MONITOR:
+        property = Property.MONITOR_PORT;
+        service = Monitor.class;
+        break;
+      case MASTER:
+        property = Property.MASTER_CLIENTPORT;
+        service = Master.class;
+        break;
+      case GARBAGE_COLLECTOR:
+        property = Property.GC_PORT;
+        service = SimpleGarbageCollector.class;
+        break;
+      default:
+        throw new IllegalArgumentException("Irrelevant server type for test");
+    }
+
+    return cluster._exec(service, serverType, 
ImmutableMap.of(property.getKey(), Integer.toString(port)));
+  }
+}

Reply via email to