Author: kturner
Date: Mon Dec 10 20:07:25 2012
New Revision: 1419712

URL: http://svn.apache.org/viewvc?rev=1419712&view=rev
Log:
ACCUMULO-879 Shaved ~1sec off Accumulo statup time by reacting to new tservers 
using zookeeper watchers

Modified:
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java

Modified: 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1419712&r1=1419711&r2=1419712&view=diff
==============================================================================
--- 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
 (original)
+++ 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
 Mon Dec 10 20:07:25 2012
@@ -22,7 +22,10 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TimerTask;
 
@@ -51,9 +54,11 @@ import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NotEmptyException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.data.Stat;
 
 public class LiveTServerSet implements Watcher {
@@ -204,6 +209,7 @@ public class LiveTServerSet implements W
   
   // Map from tserver master service to server information
   private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
+  private HashMap<String,Long> serversToDelete = new HashMap<String,Long>();
 
   public LiveTServerSet(Instance instance, AccumuloConfiguration conf, 
Listener cback) {
     this.cback = cback;
@@ -234,64 +240,124 @@ public class LiveTServerSet implements W
       final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
       
       final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+      
+      Iterator<Entry<String,Long>> serversToDelIter = 
serversToDelete.entrySet().iterator();
+      while (serversToDelIter.hasNext()) {
+        Entry<String,Long> entry = serversToDelIter.next();
+        if (System.currentTimeMillis() - entry.getValue() > 10000) {
+          String serverNode = path + "/" + entry.getKey();
+          serversToDelIter.remove();
+          deleteServerNode(serverNode);
+        }
+      }
+
       for (String server : getZooCache().getChildren(path)) {
-        // See if we have an async lock in place?
-        TServerInfo info = current.get(server);
-        TServerLockWatcher watcher;
-        ZooLock lock;
-        final String lockPath = path + "/" + server;
-        if (info != null) {
-          // yep: get out the lock/watcher so we can check on it
-          watcher = info.watcher;
-          lock = info.lock;
+        if (serversToDelete.containsKey(server))
+          continue;
+
+        checkServer(updates, doomed, path, server, 2);
+      }
+
+      // log.debug("Current: " + current.keySet());
+      if (!doomed.isEmpty() || !updates.isEmpty())
+        this.cback.update(this, doomed, updates);
+    } catch (Exception ex) {
+      log.error(ex, ex);
+    }
+  }
+
+  private void deleteServerNode(String serverNode) throws 
InterruptedException, KeeperException {
+    try {
+      ZooReaderWriter.getInstance().delete(serverNode, -1);
+    } catch (NotEmptyException ex) {
+      // race condition: tserver created the lock after our last check; we'll 
see it at the next check
+    } catch (NoNodeException nne) {
+      // someone else deleted it
+    }
+  }
+  
+  private synchronized void checkServer(final Set<TServerInstance> updates, 
final Set<TServerInstance> doomed, final String path, final String server,
+      int recurse)
+      throws TException,
+      InterruptedException, KeeperException {
+    
+    if (recurse == 0)
+      return;
+
+    // See if we have an async lock in place?
+    TServerInfo info = current.get(server);
+    TServerLockWatcher watcher;
+    ZooLock lock;
+    final String lockPath = path + "/" + server;
+    if (info != null) {
+      // yep: get out the lock/watcher so we can check on it
+      watcher = info.watcher;
+      lock = info.lock;
+    } else {
+      // nope: create a new lock and watcher
+      lock = new ZooLock(lockPath);
+      watcher = new TServerLockWatcher();
+      lock.lockAsync(watcher, "master".getBytes());
+    }
+    TServerInstance instance = null;
+    // Did we win the lock yet?
+    if (!lock.isLocked() && !watcher.gotLock && watcher.failureException == 
null) {
+      // Nope... there's a server out there: is this is a new server?
+      if (info == null) {
+        // Yep: hold onto the information about this server
+        Stat stat = new Stat();
+        byte[] lockData = ZooLock.getLockData(lockPath, stat);
+        String lockString = new String(lockData == null ? new byte[] {} : 
lockData);
+        if (lockString.length() > 0 && !lockString.equals("master")) {
+          ServerServices services = new ServerServices(new String(lockData));
+          InetSocketAddress client = 
services.getAddress(ServerServices.Service.TSERV_CLIENT);
+          InetSocketAddress addr = AddressUtil.parseAddress(server, 
Property.TSERV_CLIENTPORT);
+          TServerConnection conn = new TServerConnection(addr);
+          instance = new TServerInstance(client, stat.getEphemeralOwner());
+          info = new TServerInfo(lock, instance, conn, watcher);
+          current.put(server, info);
+          updates.add(instance);
         } else {
-          // nope: create a new lock and watcher
-          lock = new ZooLock(lockPath);
-          watcher = new TServerLockWatcher();
-          lock.lockAsync(watcher, "master".getBytes());
+          lock.tryToCancelAsyncLockOrUnlock();
         }
-        TServerInstance instance = null;
-        // Did we win the lock yet?
-        if (!lock.isLocked() && !watcher.gotLock && watcher.failureException 
== null) {
-          // Nope... there's a server out there: is this is a new server?
-          if (info == null) {
-            // Yep: hold onto the information about this server
-            Stat stat = new Stat();
-            byte[] lockData = ZooLock.getLockData(lockPath, stat);
-            String lockString = new String(lockData == null ? new byte[] {} : 
lockData);
-            if (lockString.length() > 0 && !lockString.equals("master")) {
-              ServerServices services = new ServerServices(new 
String(lockData));
-              InetSocketAddress client = 
services.getAddress(ServerServices.Service.TSERV_CLIENT);
-              InetSocketAddress addr = AddressUtil.parseAddress(server, 
Property.TSERV_CLIENTPORT);
-              TServerConnection conn = new TServerConnection(addr);
-              instance = new TServerInstance(client, stat.getEphemeralOwner());
-              info = new TServerInfo(lock, instance, conn, watcher);
-              current.put(server, info);
-              updates.add(instance);
-            } else {
-              lock.tryToCancelAsyncLockOrUnlock();
+      }
+    } else {
+      // Yes... there is no server here any more
+      lock.tryToCancelAsyncLockOrUnlock();
+      if (info != null) {
+        // a server existed here and went away so delete its node
+        doomed.add(info.instance);
+        current.remove(server);
+        info.cleanup();
+        deleteServerNode(lockPath);
+      } else {
+        // never knew of this server before... it could be a new server that 
has not created its lock node yet... watch and see if it creates the node or
+        // delete it later if it does not
+        List<String> children = 
ZooReaderWriter.getInstance().getChildren(lockPath, new Watcher() {
+          @Override
+          public void process(WatchedEvent arg0) {
+            if (arg0.getType() == EventType.NodeChildrenChanged) {
+              Set<TServerInstance> updates = new HashSet<TServerInstance>();
+              Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+              try {
+                checkServer(updates, doomed, path, server, 2);
+              } catch (Exception ex) {
+                log.error(ex, ex);
+              }
+
+              if (!doomed.isEmpty() || !updates.isEmpty())
+                cback.update(LiveTServerSet.this, doomed, updates);
             }
           }
+        });
+        
+        if (children.size() > 0) {
+          checkServer(updates, doomed, path, server, recurse--);
         } else {
-          // Yes... there is no server here any more
-          lock.tryToCancelAsyncLockOrUnlock();
-          if (info != null) {
-            doomed.add(info.instance);
-            current.remove(server);
-            info.cleanup();
-          }
-          try {
-            ZooReaderWriter.getInstance().delete(lockPath, -1);
-          } catch (NotEmptyException ex) {
-            // race condition: tserver created the lock after our last check; 
we'll see it at the next check
-          }
+          if (!serversToDelete.containsKey(server))
+            serversToDelete.put(server, System.currentTimeMillis());
         }
       }
-      // log.debug("Current: " + current.keySet());
-      if (!doomed.isEmpty() || !updates.isEmpty())
-        this.cback.update(this, doomed, updates);
-    } catch (Exception ex) {
-      log.error(ex, ex);
     }
   }
   


Reply via email to