Author: kturner Date: Mon Dec 10 20:07:25 2012 New Revision: 1419712 URL: http://svn.apache.org/viewvc?rev=1419712&view=rev Log: ACCUMULO-879 Shaved ~1sec off Accumulo statup time by reacting to new tservers using zookeeper watchers
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1419712&r1=1419711&r2=1419712&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java Mon Dec 10 20:07:25 2012 @@ -22,7 +22,10 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TimerTask; @@ -51,9 +54,11 @@ import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NotEmptyException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.data.Stat; public class LiveTServerSet implements Watcher { @@ -204,6 +209,7 @@ public class LiveTServerSet implements W // Map from tserver master service to server information private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>(); + private HashMap<String,Long> serversToDelete = new HashMap<String,Long>(); public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) { this.cback = cback; @@ -234,64 +240,124 @@ public class LiveTServerSet implements W final Set<TServerInstance> doomed = new HashSet<TServerInstance>(); final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + + Iterator<Entry<String,Long>> serversToDelIter = serversToDelete.entrySet().iterator(); + while (serversToDelIter.hasNext()) { + Entry<String,Long> entry = serversToDelIter.next(); + if (System.currentTimeMillis() - entry.getValue() > 10000) { + String serverNode = path + "/" + entry.getKey(); + serversToDelIter.remove(); + deleteServerNode(serverNode); + } + } + for (String server : getZooCache().getChildren(path)) { - // See if we have an async lock in place? - TServerInfo info = current.get(server); - TServerLockWatcher watcher; - ZooLock lock; - final String lockPath = path + "/" + server; - if (info != null) { - // yep: get out the lock/watcher so we can check on it - watcher = info.watcher; - lock = info.lock; + if (serversToDelete.containsKey(server)) + continue; + + checkServer(updates, doomed, path, server, 2); + } + + // log.debug("Current: " + current.keySet()); + if (!doomed.isEmpty() || !updates.isEmpty()) + this.cback.update(this, doomed, updates); + } catch (Exception ex) { + log.error(ex, ex); + } + } + + private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException { + try { + ZooReaderWriter.getInstance().delete(serverNode, -1); + } catch (NotEmptyException ex) { + // race condition: tserver created the lock after our last check; we'll see it at the next check + } catch (NoNodeException nne) { + // someone else deleted it + } + } + + private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String server, + int recurse) + throws TException, + InterruptedException, KeeperException { + + if (recurse == 0) + return; + + // See if we have an async lock in place? + TServerInfo info = current.get(server); + TServerLockWatcher watcher; + ZooLock lock; + final String lockPath = path + "/" + server; + if (info != null) { + // yep: get out the lock/watcher so we can check on it + watcher = info.watcher; + lock = info.lock; + } else { + // nope: create a new lock and watcher + lock = new ZooLock(lockPath); + watcher = new TServerLockWatcher(); + lock.lockAsync(watcher, "master".getBytes()); + } + TServerInstance instance = null; + // Did we win the lock yet? + if (!lock.isLocked() && !watcher.gotLock && watcher.failureException == null) { + // Nope... there's a server out there: is this is a new server? + if (info == null) { + // Yep: hold onto the information about this server + Stat stat = new Stat(); + byte[] lockData = ZooLock.getLockData(lockPath, stat); + String lockString = new String(lockData == null ? new byte[] {} : lockData); + if (lockString.length() > 0 && !lockString.equals("master")) { + ServerServices services = new ServerServices(new String(lockData)); + InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT); + InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT); + TServerConnection conn = new TServerConnection(addr); + instance = new TServerInstance(client, stat.getEphemeralOwner()); + info = new TServerInfo(lock, instance, conn, watcher); + current.put(server, info); + updates.add(instance); } else { - // nope: create a new lock and watcher - lock = new ZooLock(lockPath); - watcher = new TServerLockWatcher(); - lock.lockAsync(watcher, "master".getBytes()); + lock.tryToCancelAsyncLockOrUnlock(); } - TServerInstance instance = null; - // Did we win the lock yet? - if (!lock.isLocked() && !watcher.gotLock && watcher.failureException == null) { - // Nope... there's a server out there: is this is a new server? - if (info == null) { - // Yep: hold onto the information about this server - Stat stat = new Stat(); - byte[] lockData = ZooLock.getLockData(lockPath, stat); - String lockString = new String(lockData == null ? new byte[] {} : lockData); - if (lockString.length() > 0 && !lockString.equals("master")) { - ServerServices services = new ServerServices(new String(lockData)); - InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT); - InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT); - TServerConnection conn = new TServerConnection(addr); - instance = new TServerInstance(client, stat.getEphemeralOwner()); - info = new TServerInfo(lock, instance, conn, watcher); - current.put(server, info); - updates.add(instance); - } else { - lock.tryToCancelAsyncLockOrUnlock(); + } + } else { + // Yes... there is no server here any more + lock.tryToCancelAsyncLockOrUnlock(); + if (info != null) { + // a server existed here and went away so delete its node + doomed.add(info.instance); + current.remove(server); + info.cleanup(); + deleteServerNode(lockPath); + } else { + // never knew of this server before... it could be a new server that has not created its lock node yet... watch and see if it creates the node or + // delete it later if it does not + List<String> children = ZooReaderWriter.getInstance().getChildren(lockPath, new Watcher() { + @Override + public void process(WatchedEvent arg0) { + if (arg0.getType() == EventType.NodeChildrenChanged) { + Set<TServerInstance> updates = new HashSet<TServerInstance>(); + Set<TServerInstance> doomed = new HashSet<TServerInstance>(); + try { + checkServer(updates, doomed, path, server, 2); + } catch (Exception ex) { + log.error(ex, ex); + } + + if (!doomed.isEmpty() || !updates.isEmpty()) + cback.update(LiveTServerSet.this, doomed, updates); } } + }); + + if (children.size() > 0) { + checkServer(updates, doomed, path, server, recurse--); } else { - // Yes... there is no server here any more - lock.tryToCancelAsyncLockOrUnlock(); - if (info != null) { - doomed.add(info.instance); - current.remove(server); - info.cleanup(); - } - try { - ZooReaderWriter.getInstance().delete(lockPath, -1); - } catch (NotEmptyException ex) { - // race condition: tserver created the lock after our last check; we'll see it at the next check - } + if (!serversToDelete.containsKey(server)) + serversToDelete.put(server, System.currentTimeMillis()); } } - // log.debug("Current: " + current.keySet()); - if (!doomed.isEmpty() || !updates.isEmpty()) - this.cback.update(this, doomed, updates); - } catch (Exception ex) { - log.error(ex, ex); } }