Repository: accumulo
Updated Branches:
  refs/heads/master 930592f5e -> ee8e0705c


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java 
b/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java
new file mode 100644
index 0000000..27c57f0
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Timer;
+import java.util.TimerTask;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Keep a persistent roughly monotone view of how long a master has been 
overseeing this cluster. */
+public class MasterTime extends TimerTask {
+  private static final Logger log = LoggerFactory.getLogger(MasterTime.class);
+
+  private final String zPath;
+  private final ZooReaderWriter zk;
+  private final Master master;
+  private final Timer timer;
+
+  /** Difference between time stored in ZooKeeper and System.nanoTime() when 
we last read from ZooKeeper. */
+  private long skewAmount;
+
+  public MasterTime(Master master) throws IOException {
+    this.zPath = ZooUtil.getRoot(master.getInstance()) + 
Constants.ZMASTER_TICK;
+    this.zk = ZooReaderWriter.getInstance();
+    this.master = master;
+
+    try {
+      zk.putPersistentData(zPath, "0".getBytes(StandardCharsets.UTF_8), 
NodeExistsPolicy.SKIP);
+      skewAmount = Long.parseLong(new String(zk.getData(zPath, null), 
StandardCharsets.UTF_8)) - System.nanoTime();
+    } catch (Exception ex) {
+      throw new IOException("Error updating master time", ex);
+    }
+
+    this.timer = new Timer();
+    timer.schedule(this, 0, MILLISECONDS.convert(10, SECONDS));
+  }
+
+  /**
+   * How long has this cluster had a Master?
+   *
+   * @returns Approximate total duration this cluster has had a Master, in 
milliseconds.
+   */
+  public synchronized long getTime() {
+    return MILLISECONDS.convert(System.nanoTime() + skewAmount, NANOSECONDS);
+  }
+
+  /** Shut down the time keeping. */
+  public void shutdown() {
+    timer.cancel();
+  }
+
+  @Override
+  public void run() {
+    switch (master.getMasterState()) {
+    // If we don't have the lock, periodically re-read the value in ZooKeeper, 
in case there's another master we're
+    // shadowing for.
+      case INITIAL:
+      case STOP:
+        try {
+          long zkTime = Long.parseLong(new String(zk.getData(zPath, null), 
StandardCharsets.UTF_8));
+          synchronized (this) {
+            skewAmount = zkTime - System.nanoTime();
+          }
+        } catch (Exception ex) {
+          if (log.isDebugEnabled()) {
+            log.debug("Failed to retrieve master tick time", ex);
+          }
+        }
+        break;
+      // If we do have the lock, periodically write our clock to ZooKeeper.
+      case HAVE_LOCK:
+      case SAFE_MODE:
+      case NORMAL:
+      case UNLOAD_METADATA_TABLETS:
+      case UNLOAD_ROOT_TABLET:
+        try {
+          zk.putPersistentData(zPath, Long.toString(System.nanoTime() + 
skewAmount).getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
+        } catch (Exception ex) {
+          if (log.isDebugEnabled()) {
+            log.debug("Failed to update master tick time", ex);
+          }
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
 
b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index e20335b..2cf7d9d 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -16,8 +16,8 @@
  */
 package org.apache.accumulo.master;
 
+import com.google.common.collect.ImmutableSortedSet;
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-import static java.lang.Math.min;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -92,8 +92,13 @@ import org.apache.thrift.TException;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import static java.lang.Math.min;
+import java.util.SortedSet;
+import static java.lang.Math.min;
 
-class TabletGroupWatcher extends Daemon {
+abstract class TabletGroupWatcher extends Daemon {
   // Constants used to make sure assignment logging isn't excessive in 
quantity or size
   private static final String ASSIGNMENT_BUFFER_SEPARATOR = ", ";
   private static final int ASSINGMENT_BUFFER_MAX_LENGTH = 4096;
@@ -101,9 +106,11 @@ class TabletGroupWatcher extends Daemon {
   private final Master master;
   final TabletStateStore store;
   final TabletGroupWatcher dependentWatcher;
+
   private MasterState masterState;
 
   final TableStats stats = new TableStats();
+  private SortedSet<TServerInstance> lastScanServers = ImmutableSortedSet.of();
 
   TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher 
dependentWatcher) {
     this.master = master;
@@ -111,6 +118,9 @@ class TabletGroupWatcher extends Daemon {
     this.dependentWatcher = dependentWatcher;
   }
 
+  /** Should this {@code TabletGroupWatcher} suspend tablets? */
+  abstract boolean canSuspendTablets();
+
   Map<String,TableCounts> getStats() {
     return stats.getLast();
   }
@@ -124,9 +134,13 @@ class TabletGroupWatcher extends Daemon {
     return stats.getLast(tableId);
   }
 
+  /** True if the collection of live tservers specified in 'candidates' hasn't 
changed since the last time an assignment scan was started. */
+  public synchronized boolean isSameTserversAsLastScan(Set<TServerInstance> 
candidates) {
+    return candidates.equals(lastScanServers);
+  }
+
   @Override
   public void run() {
-
     Thread.currentThread().setName("Watching " + store.name());
     int[] oldCounts = new int[TabletState.values().length];
     EventCoordinator.Listener eventListener = 
this.master.nextEvent.getListener();
@@ -158,6 +172,9 @@ class TabletGroupWatcher extends Daemon {
 
         if (currentTServers.size() == 0) {
           eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+          synchronized (this) {
+            lastScanServers = ImmutableSortedSet.of();
+          }
           continue;
         }
 
@@ -165,9 +182,10 @@ class TabletGroupWatcher extends Daemon {
         SortedMap<TServerInstance,TabletServerStatus> destinations = new 
TreeMap<>(currentTServers);
         destinations.keySet().removeAll(this.master.serversToShutdown);
 
-        List<Assignment> assignments = new ArrayList<>();
-        List<Assignment> assigned = new ArrayList<>();
+        List<Assignment> assignments = new ArrayList<Assignment>();
+        List<Assignment> assigned = new ArrayList<Assignment>();
         List<TabletLocationState> assignedToDeadServers = new ArrayList<>();
+        List<TabletLocationState> suspendedToGoneServers = new ArrayList<>();
         Map<KeyExtent,TServerInstance> unassigned = new HashMap<>();
         Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>();
 
@@ -192,15 +210,18 @@ class TabletGroupWatcher extends Daemon {
 
           // Don't overwhelm the tablet servers with work
           if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * 
currentTServers.size()) {
-            flushChanges(destinations, assignments, assigned, 
assignedToDeadServers, logsForDeadServers, unassigned);
+            flushChanges(destinations, assignments, assigned, 
assignedToDeadServers, logsForDeadServers, suspendedToGoneServers, unassigned);
             assignments.clear();
             assigned.clear();
             assignedToDeadServers.clear();
+            suspendedToGoneServers.clear();
             unassigned.clear();
             unloaded = 0;
             eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
           }
           String tableId = tls.extent.getTableId();
+          TableConfiguration tableConf = 
this.master.getConfigurationFactory().getTableConfiguration(tableId);
+
           MergeStats mergeStats = mergeStatsCache.get(tableId);
           if (mergeStats == null) {
             mergeStats = currentMerges.get(tableId);
@@ -226,7 +247,7 @@ class TabletGroupWatcher extends Daemon {
           }
 
           // if we are shutting down all the tabletservers, we have to do it 
in order
-          if (goal == TabletGoalState.UNASSIGNED && state == 
TabletState.HOSTED) {
+          if (goal == TabletGoalState.SUSPENDED && state == 
TabletState.HOSTED) {
             if 
(this.master.serversToShutdown.equals(currentTServers.keySet())) {
               if (dependentWatcher != null && 
dependentWatcher.assignedOrHosted() > 0) {
                 goal = TabletGoalState.HOSTED;
@@ -253,6 +274,29 @@ class TabletGroupWatcher extends Daemon {
                   logsForDeadServers.put(tserver, wals.getWalsInUse(tserver));
                 }
                 break;
+              case SUSPENDED:
+                if (master.getSteadyTime() - tls.suspend.suspensionTime < 
tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) {
+                  // Tablet is suspended. See if its tablet server is back.
+                  TServerInstance returnInstance = null;
+                  Iterator<TServerInstance> find = destinations.tailMap(new 
TServerInstance(tls.suspend.server, " ")).keySet().iterator();
+                  if (find.hasNext()) {
+                    TServerInstance found = find.next();
+                    if (found.getLocation().equals(tls.suspend.server)) {
+                      returnInstance = found;
+                    }
+                  }
+
+                  // Old tablet server is back. Return this tablet to its 
previous owner.
+                  if (returnInstance != null) {
+                    assignments.add(new Assignment(tls.extent, 
returnInstance));
+                  } else {
+                    // leave suspended, don't ask for a new assignment.
+                  }
+                } else {
+                  // Treat as unassigned, ask for a new assignment.
+                  unassigned.put(tls.extent, server);
+                }
+                break;
               case UNASSIGNED:
                 // maybe it's a finishing migration
                 TServerInstance dest = this.master.migrations.get(tls.extent);
@@ -276,6 +320,10 @@ class TabletGroupWatcher extends Daemon {
             }
           } else {
             switch (state) {
+              case SUSPENDED:
+                // Request a move to UNASSIGNED, so as to allow balancing to 
continue.
+                suspendedToGoneServers.add(tls);
+                // Fall through to unassigned to cancel migrations.
               case UNASSIGNED:
                 TServerInstance dest = this.master.migrations.get(tls.extent);
                 TableState tableState = 
TableManager.getInstance().getTableState(tls.extent.getTableId());
@@ -292,7 +340,7 @@ class TabletGroupWatcher extends Daemon {
               case HOSTED:
                 TServerConnection conn = 
this.master.tserverSet.getConnection(server);
                 if (conn != null) {
-                  conn.unloadTablet(this.master.masterLock, tls.extent, goal 
!= TabletGoalState.DELETED);
+                  conn.unloadTablet(this.master.masterLock, tls.extent, 
goal.howUnload(), master.getSteadyTime());
                   unloaded++;
                   totalUnloaded++;
                 } else {
@@ -306,7 +354,7 @@ class TabletGroupWatcher extends Daemon {
           counts[state.ordinal()]++;
         }
 
-        flushChanges(destinations, assignments, assigned, 
assignedToDeadServers, logsForDeadServers, unassigned);
+        flushChanges(destinations, assignments, assigned, 
assignedToDeadServers, logsForDeadServers, suspendedToGoneServers, unassigned);
 
         // provide stats after flushing changes to avoid race conditions w/ 
delete table
         stats.end(masterState);
@@ -326,6 +374,9 @@ class TabletGroupWatcher extends Daemon {
 
         updateMergeState(mergeStatsCache);
 
+        synchronized (this) {
+          lastScanServers = 
ImmutableSortedSet.copyOf(currentTServers.keySet());
+        }
         if 
(this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) {
           Master.log.debug(String.format("[%s] sleeping for %.2f seconds", 
store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
           eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
@@ -749,15 +800,25 @@ class TabletGroupWatcher extends Daemon {
   }
 
   private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> 
currentTServers, List<Assignment> assignments, List<Assignment> assigned,
-      List<TabletLocationState> assignedToDeadServers, 
Map<TServerInstance,List<Path>> logsForDeadServers, 
Map<KeyExtent,TServerInstance> unassigned)
-      throws DistributedStoreException, TException, WalMarkerException {
+      List<TabletLocationState> assignedToDeadServers, 
Map<TServerInstance,List<Path>> logsForDeadServers, List<TabletLocationState> 
suspendedToGoneServers,
+      Map<KeyExtent,TServerInstance> unassigned) throws 
DistributedStoreException, TException, WalMarkerException {
+    boolean tabletsSuspendable = canSuspendTablets();
     if (!assignedToDeadServers.isEmpty()) {
       int maxServersToShow = min(assignedToDeadServers.size(), 100);
       Master.log.debug(assignedToDeadServers.size() + " assigned to dead 
servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
       Master.log.debug("logs for dead servers: " + logsForDeadServers);
-      store.unassign(assignedToDeadServers, logsForDeadServers);
+      if (tabletsSuspendable) {
+        store.suspend(assignedToDeadServers, logsForDeadServers, 
master.getSteadyTime());
+      } else {
+        store.unassign(assignedToDeadServers, logsForDeadServers);
+      }
       this.master.markDeadServerLogsAsClosed(logsForDeadServers);
-      this.master.nextEvent.event("Marked %d tablets as unassigned because 
they don't have current servers", assignedToDeadServers.size());
+      this.master.nextEvent.event("Marked %d tablets as suspended because they 
don't have current servers", assignedToDeadServers.size());
+    }
+    if (!suspendedToGoneServers.isEmpty()) {
+      int maxServersToShow = min(assignedToDeadServers.size(), 100);
+      Master.log.debug(assignedToDeadServers.size() + " suspended to gone 
servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
+      store.unsuspend(suspendedToGoneServers);
     }
 
     if (!currentTServers.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java 
b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
index a89100e..4cb858c 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@ -99,7 +99,7 @@ public class MergeStats {
     this.total++;
     if (state.equals(TabletState.HOSTED))
       this.hosted++;
-    if (state.equals(TabletState.UNASSIGNED))
+    if (state.equals(TabletState.UNASSIGNED) || 
state.equals(TabletState.SUSPENDED))
       this.unassigned++;
   }
 
@@ -217,7 +217,7 @@ public class MergeStats {
           return false;
         }
 
-        if (tls.getState(master.onlineTabletServers()) != 
TabletState.UNASSIGNED) {
+        if (tls.getState(master.onlineTabletServers()) != 
TabletState.UNASSIGNED && tls.getState(master.onlineTabletServers()) != 
TabletState.SUSPENDED) {
           log.debug("failing consistency: assigned or hosted " + tls);
           return false;
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java 
b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
index 73395ea..dd44bc6 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java
@@ -36,4 +36,8 @@ public class TableCounts {
   public int hosted() {
     return counts[TabletState.HOSTED.ordinal()];
   }
+
+  public int suspended() {
+    return counts[TabletState.SUSPENDED.ordinal()];
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
index 0bc989e..6497f96 100644
--- 
a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
@@ -176,7 +176,7 @@ public class RootTabletStateStoreTest {
     assertEquals(count, 1);
     TabletLocationState assigned = null;
     try {
-      assigned = new TabletLocationState(root, server, null, null, null, 
false);
+      assigned = new TabletLocationState(root, server, null, null, null, null, 
false);
     } catch (BadLocationStateException e) {
       fail("Unexpected error " + e);
     }
@@ -203,7 +203,7 @@ public class RootTabletStateStoreTest {
 
     TabletLocationState broken = null;
     try {
-      broken = new TabletLocationState(notRoot, server, null, null, null, 
false);
+      broken = new TabletLocationState(notRoot, server, null, null, null, 
null, false);
     } catch (BadLocationStateException e) {
       fail("Unexpected error " + e);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 94e2ed9..c4df66d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -258,6 +258,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
 
 public class TabletServer extends AccumuloServerContext implements Runnable {
 
@@ -329,7 +332,7 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
   private final ZooAuthenticationKeyWatcher authKeyWatcher;
   private final WalStateManager walMarker;
 
-  public TabletServer(ServerConfigurationFactory confFactory, VolumeManager 
fs) {
+  public TabletServer(ServerConfigurationFactory confFactory, VolumeManager 
fs) throws IOException {
     super(confFactory);
     this.confFactory = confFactory;
     this.fs = fs;
@@ -1549,7 +1552,7 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
     }
 
     @Override
-    public void unloadTablet(TInfo tinfo, TCredentials credentials, String 
lock, TKeyExtent textent, boolean save) {
+    public void unloadTablet(TInfo tinfo, TCredentials credentials, String 
lock, TKeyExtent textent, TUnloadTabletGoal goal, long requestTime) {
       try {
         checkPermission(credentials, lock, "unloadTablet");
       } catch (ThriftSecurityException e) {
@@ -1559,7 +1562,7 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
 
       KeyExtent extent = new KeyExtent(textent);
 
-      resourceManager.addMigration(extent, new LoggingRunnable(log, new 
UnloadTabletHandler(extent, save)));
+      resourceManager.addMigration(extent, new LoggingRunnable(log, new 
UnloadTabletHandler(extent, goal, requestTime)));
     }
 
     @Override
@@ -1939,11 +1942,13 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
 
   private class UnloadTabletHandler implements Runnable {
     private final KeyExtent extent;
-    private final boolean saveState;
+    private final TUnloadTabletGoal goalState;
+    private final long requestTimeSkew;
 
-    public UnloadTabletHandler(KeyExtent extent, boolean saveState) {
+    public UnloadTabletHandler(KeyExtent extent, TUnloadTabletGoal goalState, 
long requestTime) {
       this.extent = extent;
-      this.saveState = saveState;
+      this.goalState = goalState;
+      this.requestTimeSkew = requestTime - 
MILLISECONDS.convert(System.nanoTime(), NANOSECONDS);
     }
 
     @Override
@@ -1982,7 +1987,7 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
       }
 
       try {
-        t.close(saveState);
+        t.close(!goalState.equals(TUnloadTabletGoal.DELETED));
       } catch (Throwable e) {
 
         if ((t.isClosing() || t.isClosed()) && e instanceof 
IllegalStateException) {
@@ -2003,12 +2008,18 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
         TServerInstance instance = new TServerInstance(clientAddress, 
getLock().getSessionId());
         TabletLocationState tls = null;
         try {
-          tls = new TabletLocationState(extent, null, instance, null, null, 
false);
+          tls = new TabletLocationState(extent, null, instance, null, null, 
null, false);
         } catch (BadLocationStateException e) {
           log.error("Unexpected error ", e);
         }
-        log.debug("Unassigning " + tls);
-        TabletStateStore.unassign(TabletServer.this, tls, null);
+        if (!goalState.equals(TUnloadTabletGoal.SUSPENDED) || 
extent.isRootTablet()
+            || (extent.isMeta() && 
!getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE))) {
+          log.debug("Unassigning " + tls);
+          TabletStateStore.unassign(TabletServer.this, tls, null);
+        } else {
+          log.debug("Suspending " + tls);
+          TabletStateStore.suspend(TabletServer.this, tls, null, 
requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS));
+        }
       } catch (DistributedStoreException ex) {
         log.warn("Unable to update storage", ex);
       } catch (KeeperException e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java 
b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
index 30584a6..2d233c4 100644
--- a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
@@ -185,7 +185,7 @@ public class MergeStateIT extends ConfigurableMacBase {
     // take it offline
     m = tablet.getPrevRowUpdateMutation();
     Collection<Collection<String>> walogs = Collections.emptyList();
-    metaDataStateStore.unassign(Collections.singletonList(new 
TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), 
null);
+    metaDataStateStore.unassign(Collections.singletonList(new 
TabletLocationState(tablet, null, state.someTServer, null, null, walogs, 
false)), null);
 
     // now we can split
     stats = scan(state, metaDataStateStore);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java 
b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java
new file mode 100644
index 0000000..edd1aff
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.master;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.ClientExec;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.net.HostAndPort;
+
+public class SuspendedTabletsIT extends ConfigurableMacBase {
+  private static final Logger log = 
LoggerFactory.getLogger(SuspendedTabletsIT.class);
+  private static final Random RANDOM = new Random();
+  private static ExecutorService THREAD_POOL;
+
+  public static final int TSERVERS = 5;
+  public static final long SUSPEND_DURATION = MILLISECONDS.convert(30, 
SECONDS);
+  public static final int TABLETS = 100;
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) {
+    cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "ms");
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+    cfg.setNumTservers(TSERVERS);
+  }
+
+  @Test
+  public void crashAndResumeTserver() throws Exception {
+    // Run the test body. When we get to the point where we need a tserver to 
go away, get rid of it via crashing
+    suspensionTestBody(new TServerKiller() {
+      @Override
+      public void eliminateTabletServers(ClientContext ctx, TabletLocations 
locs, int count) throws Exception {
+        List<ProcessReference> procs = new 
ArrayList<>(getCluster().getProcesses().get(ServerType.TABLET_SERVER));
+        Collections.shuffle(procs);
+
+        for (int i = 0; i < count; ++i) {
+          ProcessReference pr = procs.get(i);
+          log.info("Crashing {}", pr.getProcess());
+          getCluster().killProcess(ServerType.TABLET_SERVER, pr);
+        }
+      }
+    });
+  }
+
+  @Test
+  public void shutdownAndResumeTserver() throws Exception {
+    // Run the test body. When we get to the point where we need tservers to 
go away, stop them via a clean shutdown.
+    suspensionTestBody(new TServerKiller() {
+      @Override
+      public void eliminateTabletServers(final ClientContext ctx, 
TabletLocations locs, int count) throws Exception {
+        Set<TServerInstance> tserversSet = new HashSet<>();
+        for (TabletLocationState tls : locs.locationStates.values()) {
+          if (tls.current != null) {
+            tserversSet.add(tls.current);
+          }
+        }
+        List<TServerInstance> tserversList = new ArrayList<>(tserversSet);
+        Collections.shuffle(tserversList, RANDOM);
+
+        for (int i = 0; i < count; ++i) {
+          final String tserverName = tserversList.get(i).toString();
+          MasterClient.execute(ctx, new 
ClientExec<MasterClientService.Client>() {
+            @Override
+            public void execute(MasterClientService.Client client) throws 
Exception {
+              log.info("Sending shutdown command to {} via 
MasterClientService", tserverName);
+              client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, 
false);
+            }
+          });
+        }
+
+        log.info("Waiting for tserver process{} to die", count == 1 ? "" : 
"es");
+        for (int i = 0; i < 10; ++i) {
+          List<ProcessReference> deadProcs = new ArrayList<>();
+          for (ProcessReference pr : 
getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
+            Process p = pr.getProcess();
+            if (!p.isAlive()) {
+              deadProcs.add(pr);
+            }
+          }
+          for (ProcessReference pr : deadProcs) {
+            log.info("Process {} is dead, informing cluster control about 
this", pr.getProcess());
+            
getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr);
+            --count;
+          }
+          if (count == 0) {
+            return;
+          } else {
+            Thread.sleep(MILLISECONDS.convert(2, SECONDS));
+          }
+        }
+        throw new IllegalStateException("Tablet servers didn't die!");
+      }
+    });
+  }
+
+  /**
+   * Main test body for suspension tests.
+   *
+   * @param serverStopper
+   *          callback which shuts down some tablet servers.
+   */
+  private void suspensionTestBody(TServerKiller serverStopper) throws 
Exception {
+    Credentials creds = new Credentials("root", new 
PasswordToken(ROOT_PASSWORD));
+    Instance instance = new ZooKeeperInstance(getCluster().getClientConfig());
+    ClientContext ctx = new ClientContext(instance, creds, 
getCluster().getClientConfig());
+
+    String tableName = getUniqueNames(1)[0];
+
+    Connector conn = ctx.getConnector();
+
+    // Create a table with a bunch of splits
+    log.info("Creating table " + tableName);
+    conn.tableOperations().create(tableName);
+    SortedSet<Text> splitPoints = new TreeSet<>();
+    for (int i = 1; i < TABLETS; ++i) {
+      splitPoints.add(new Text("" + i));
+    }
+    conn.tableOperations().addSplits(tableName, splitPoints);
+
+    // Wait for all of the tablets to hosted ...
+    log.info("Waiting on hosting and balance");
+    TabletLocations ds;
+    for (ds = TabletLocations.retrieve(ctx, tableName); ds.hostedCount != 
TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) {
+      Thread.sleep(1000);
+    }
+
+    // ... and balanced.
+    conn.instanceOperations().waitForBalance();
+    do {
+      // Give at least another 5 seconds for migrations to finish up
+      Thread.sleep(5000);
+      ds = TabletLocations.retrieve(ctx, tableName);
+    } while (ds.hostedCount != TABLETS);
+
+    // Pray all of our tservers have at least 1 tablet.
+    Assert.assertEquals(TSERVERS, ds.hosted.keySet().size());
+
+    // Kill two tablet servers hosting our tablets. This should put tablets 
into suspended state, and thus halt balancing.
+
+    TabletLocations beforeDeathState = ds;
+    log.info("Eliminating tablet servers");
+    serverStopper.eliminateTabletServers(ctx, beforeDeathState, 2);
+
+    // Eventually some tablets will be suspended.
+    log.info("Waiting on suspended tablets");
+    ds = TabletLocations.retrieve(ctx, tableName);
+    // Until we can scan the metadata table, the master probably can't either, 
so won't have been able to suspend the tablets.
+    // So we note the time that we were first able to successfully scan the 
metadata table.
+    long killTime = System.nanoTime();
+    while (ds.suspended.keySet().size() != 2) {
+      Thread.sleep(1000);
+      ds = TabletLocations.retrieve(ctx, tableName);
+    }
+
+    SetMultimap<HostAndPort,KeyExtent> deadTabletsByServer = ds.suspended;
+
+    // By this point, all tablets should be either hosted or suspended. All 
suspended tablets should
+    // "belong" to the dead tablet servers, and should be in exactly the same 
place as before any tserver death.
+    for (HostAndPort server : deadTabletsByServer.keySet()) {
+      Assert.assertEquals(deadTabletsByServer.get(server), 
beforeDeathState.hosted.get(server));
+    }
+    Assert.assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount);
+
+    // Restart the first tablet server, making sure it ends up on the same port
+    HostAndPort restartedServer = 
deadTabletsByServer.keySet().iterator().next();
+    log.info("Restarting " + restartedServer);
+    getCluster().getClusterControl().start(ServerType.TABLET_SERVER, null,
+        ImmutableMap.of(Property.TSERV_CLIENTPORT.getKey(), "" + 
restartedServer.getPort(), Property.TSERV_PORTSEARCH.getKey(), "false"), 1);
+
+    // Eventually, the suspended tablets should be reassigned to the newly 
alive tserver.
+    log.info("Awaiting tablet unsuspension for tablets belonging to " + 
restartedServer);
+    for (ds = TabletLocations.retrieve(ctx, tableName); 
ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0; ds = 
TabletLocations.retrieve(ctx,
+        tableName)) {
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(deadTabletsByServer.get(restartedServer), 
ds.hosted.get(restartedServer));
+
+    // Finally, after much longer, remaining suspended tablets should be 
reassigned.
+    log.info("Awaiting tablet reassignment for remaining tablets");
+    for (ds = TabletLocations.retrieve(ctx, tableName); ds.hostedCount != 
TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) {
+      Thread.sleep(1000);
+    }
+
+    long recoverTime = System.nanoTime();
+    Assert.assertTrue(recoverTime - killTime >= 
NANOSECONDS.convert(SUSPEND_DURATION, MILLISECONDS));
+  }
+
+  private static interface TServerKiller {
+    public void eliminateTabletServers(ClientContext ctx, TabletLocations 
locs, int count) throws Exception;
+  }
+
+  private static final AtomicInteger threadCounter = new AtomicInteger(0);
+
+  @BeforeClass
+  public static void init() {
+    THREAD_POOL = Executors.newCachedThreadPool(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "Scanning deadline thread #" + 
threadCounter.incrementAndGet());
+      }
+    });
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    THREAD_POOL.shutdownNow();
+  }
+
+  private static class TabletLocations {
+    public final Map<KeyExtent,TabletLocationState> locationStates = new 
HashMap<>();
+    public final SetMultimap<HostAndPort,KeyExtent> hosted = 
HashMultimap.create();
+    public final SetMultimap<HostAndPort,KeyExtent> suspended = 
HashMultimap.create();
+    public int hostedCount = 0;
+    public int assignedCount = 0;
+    public int suspendedCount = 0;
+    public int unassignedCount = 0;
+
+    private TabletLocations() {}
+
+    public static TabletLocations retrieve(final ClientContext ctx, final 
String tableName) throws Exception {
+      int sleepTime = 200;
+      int remainingAttempts = 30;
+
+      while (true) {
+        try {
+          FutureTask<TabletLocations> tlsFuture = new FutureTask<>(new 
Callable<TabletLocations>() {
+            @Override
+            public TabletLocations call() throws Exception {
+              TabletLocations answer = new TabletLocations();
+              answer.scan(ctx, tableName);
+              return answer;
+            }
+          });
+          THREAD_POOL.submit(tlsFuture);
+          return tlsFuture.get(5, SECONDS);
+        } catch (TimeoutException ex) {
+          log.debug("Retrieval timed out", ex);
+        } catch (Exception ex) {
+          log.warn("Failed to scan metadata", ex);
+        }
+        sleepTime = Math.min(2 * sleepTime, 10000);
+        Thread.sleep(sleepTime);
+        --remainingAttempts;
+        if (remainingAttempts == 0) {
+          Assert.fail("Scanning of metadata failed, aborting");
+        }
+      }
+    }
+
+    private void scan(ClientContext ctx, String tableName) throws Exception {
+      Map<String,String> idMap = 
ctx.getConnector().tableOperations().tableIdMap();
+      String tableId = Objects.requireNonNull(idMap.get(tableName));
+      try (MetaDataTableScanner scanner = new MetaDataTableScanner(ctx, new 
Range())) {
+        while (scanner.hasNext()) {
+          TabletLocationState tls = scanner.next();
+
+          if (!tls.extent.getTableId().equals(tableId)) {
+            continue;
+          }
+          locationStates.put(tls.extent, tls);
+          if (tls.suspend != null) {
+            suspended.put(tls.suspend.server, tls.extent);
+            ++suspendedCount;
+          } else if (tls.current != null) {
+            hosted.put(tls.current.getLocation(), tls.extent);
+            ++hostedCount;
+          } else if (tls.future != null) {
+            ++assignedCount;
+          } else {
+            unassignedCount += 1;
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
 
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 63a7771..05a0c54 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -79,6 +79,7 @@ import com.beust.jcommander.Parameter;
 import com.google.common.net.HostAndPort;
 
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
 
 /**
  * The purpose of this class is to server as fake tserver that is a data sink 
like /dev/null. NullTserver modifies the metadata location entries for a table 
to
@@ -179,7 +180,7 @@ public class NullTserver {
     public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, 
TKeyExtent extent) throws TException {}
 
     @Override
-    public void unloadTablet(TInfo tinfo, TCredentials credentials, String 
lock, TKeyExtent extent, boolean save) throws TException {}
+    public void unloadTablet(TInfo tinfo, TCredentials credentials, String 
lock, TKeyExtent extent, TUnloadTabletGoal goal, long requestTime) throws 
TException {}
 
     @Override
     public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials 
credentials) throws ThriftSecurityException, TException {

Reply via email to