Merge branch '1.5' into 1.6 Conflicts: server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1ea90109 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1ea90109 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1ea90109 Branch: refs/heads/master Commit: 1ea901091f6f443359cd46d5d71cb9eb9ca5db8f Parents: 775cdb1 4ff11d4 Author: Keith Turner <ktur...@apache.org> Authored: Fri Mar 13 08:22:52 2015 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Fri Mar 13 08:22:52 2015 -0400 ---------------------------------------------------------------------- .../accumulo/server/master/LiveTServerSet.java | 25 ++++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ea90109/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java index 16f9885,0000000..332bfc9 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java @@@ -1,401 -1,0 +1,416 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master; + +import static com.google.common.base.Charsets.UTF_8; +import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NotEmptyException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +import com.google.common.net.HostAndPort; + +public class LiveTServerSet implements Watcher { + + public interface Listener { + void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added); + } + + private static final Logger log = Logger.getLogger(LiveTServerSet.class); + + private final Listener cback; + private final Instance instance; + private final AccumuloConfiguration conf; + private ZooCache zooCache; + + public class TServerConnection { + private final HostAndPort address; + + public TServerConnection(HostAndPort addr) throws TException { + address = addr; + } + + private String lockString(ZooLock mlock) { + return mlock.getLockID().serialize(ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK); + } + ++ private void loadTablet(TabletClientService.Client client, ZooLock lock, KeyExtent extent) throws TException { ++ client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); ++ } ++ + public void assignTablet(ZooLock lock, KeyExtent extent) throws TException { - TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - try { - client.loadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); - } finally { - ThriftUtil.returnClient(client); ++ if (extent.isMeta()) { ++ // see ACCUMULO-3597 ++ TTransport transport = ThriftUtil.createTransport(address, conf); ++ try { ++ TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport); ++ loadTablet(client, lock, extent); ++ } finally { ++ transport.close(); ++ } ++ } else { ++ TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); ++ try { ++ loadTablet(client, lock, extent); ++ } finally { ++ ThriftUtil.returnClient(client); ++ } + } + } + + public void unloadTablet(ZooLock lock, KeyExtent extent, boolean save) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.unloadTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift(), save); + } finally { + ThriftUtil.returnClient(client); + } + } + + public TabletServerStatus getTableMap(boolean usePooledConnection) throws TException, ThriftSecurityException { + + if (usePooledConnection == true) + throw new UnsupportedOperationException(); + + TTransport transport = ThriftUtil.createTransport(address, conf); + + try { + TabletClientService.Client client = ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport); + return client.getTabletServerStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance)); + } finally { + if (transport != null) + transport.close(); + } + } + + public void halt(ZooLock lock) throws TException, ThriftSecurityException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.halt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void fastHalt(ZooLock lock) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.fastHalt(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void flush(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.flush(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId, + startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void chop(ZooLock lock, KeyExtent extent) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.chop(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void splitTablet(ZooLock lock, KeyExtent extent, Text splitPoint) throws TException, ThriftSecurityException, NotServingTabletException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.splitTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), extent.toThrift(), + ByteBuffer.wrap(splitPoint.getBytes(), 0, splitPoint.getLength())); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void flushTablet(ZooLock lock, KeyExtent extent) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.flushTablet(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), extent.toThrift()); + } finally { + ThriftUtil.returnClient(client); + } + } + + public void compact(ZooLock lock, String tableId, byte[] startRow, byte[] endRow) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + client.compact(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), lockString(lock), tableId, + startRow == null ? null : ByteBuffer.wrap(startRow), endRow == null ? null : ByteBuffer.wrap(endRow)); + } finally { + ThriftUtil.returnClient(client); + } + } + + public boolean isActive(long tid) throws TException { + TabletClientService.Client client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + try { + return client.isActive(Tracer.traceInfo(), tid); + } finally { + ThriftUtil.returnClient(client); + } + } + + } + + static class TServerInfo { + TServerConnection connection; + TServerInstance instance; + + TServerInfo(TServerInstance instance, TServerConnection connection) { + this.connection = connection; + this.instance = instance; + } + }; + + // The set of active tservers with locks, indexed by their name in zookeeper + private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>(); + // as above, indexed by TServerInstance + private Map<TServerInstance,TServerInfo> currentInstances = new HashMap<TServerInstance,TServerInfo>(); + + // The set of entries in zookeeper without locks, and the first time each was noticed + private Map<String,Long> locklessServers = new HashMap<String,Long>(); + + public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) { + this.cback = cback; + this.instance = instance; + this.conf = conf; + + } + + public synchronized ZooCache getZooCache() { + if (zooCache == null) + zooCache = new ZooCache(this); + return zooCache; + } + + public synchronized void startListeningForTabletServerChanges() { + scanServers(); + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + scanServers(); + } + }, 0, 5000); + } + + public synchronized void scanServers() { + try { + final Set<TServerInstance> updates = new HashSet<TServerInstance>(); + final Set<TServerInstance> doomed = new HashSet<TServerInstance>(); + + final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + + HashSet<String> all = new HashSet<String>(current.keySet()); + all.addAll(getZooCache().getChildren(path)); + + locklessServers.keySet().retainAll(all); + + for (String zPath : all) { + checkServer(updates, doomed, path, zPath); + } + + // log.debug("Current: " + current.keySet()); + if (!doomed.isEmpty() || !updates.isEmpty()) + this.cback.update(this, doomed, updates); + } catch (Exception ex) { + log.error(ex, ex); + } + } + + private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException { + try { + ZooReaderWriter.getInstance().delete(serverNode, -1); + } catch (NotEmptyException ex) { + // race condition: tserver created the lock after our last check; we'll see it at the next check + } catch (NoNodeException nne) { + // someone else deleted it + } + } + + private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String zPath) + throws TException, InterruptedException, KeeperException { + + TServerInfo info = current.get(zPath); + + final String lockPath = path + "/" + zPath; + Stat stat = new Stat(); + byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat); + + if (lockData == null) { + if (info != null) { + doomed.add(info.instance); + current.remove(zPath); + currentInstances.remove(info.instance); + } + + Long firstSeen = locklessServers.get(zPath); + if (firstSeen == null) { + locklessServers.put(zPath, System.currentTimeMillis()); + } else if (System.currentTimeMillis() - firstSeen > 10 * 60 * 1000) { + deleteServerNode(path + "/" + zPath); + locklessServers.remove(zPath); + } + } else { + locklessServers.remove(zPath); + ServerServices services = new ServerServices(new String(lockData, UTF_8)); + HostAndPort client = services.getAddress(ServerServices.Service.TSERV_CLIENT); + TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner()); + + if (info == null) { + updates.add(instance); + TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + current.put(zPath, tServerInfo); + currentInstances.put(instance, tServerInfo); + } else if (!info.instance.equals(instance)) { + doomed.add(info.instance); + updates.add(instance); + TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + current.put(zPath, tServerInfo); + currentInstances.remove(info.instance); + currentInstances.put(instance, tServerInfo); + } + } + } + + @Override + public void process(WatchedEvent event) { + + // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared + // relevant nodes before code below reads from zoocache + + if (event.getPath() != null) { + if (event.getPath().endsWith(Constants.ZTSERVERS)) { + scanServers(); + } else if (event.getPath().contains(Constants.ZTSERVERS)) { + int pos = event.getPath().lastIndexOf('/'); + + // do only if ZTSERVER is parent + if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) { + + String server = event.getPath().substring(pos + 1); + + final Set<TServerInstance> updates = new HashSet<TServerInstance>(); + final Set<TServerInstance> doomed = new HashSet<TServerInstance>(); + + final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + + try { + checkServer(updates, doomed, path, server); + if (!doomed.isEmpty() || !updates.isEmpty()) + this.cback.update(this, doomed, updates); + } catch (Exception ex) { + log.error(ex, ex); + } + } + } + } + } + + public synchronized TServerConnection getConnection(TServerInstance server) { + if (server == null) + return null; + TServerInfo tServerInfo = currentInstances.get(server); + if (tServerInfo == null) + return null; + return tServerInfo.connection; + } + + public synchronized Set<TServerInstance> getCurrentServers() { + return new HashSet<TServerInstance>(currentInstances.keySet()); + } + + public synchronized int size() { + return current.size(); + } + + public synchronized TServerInstance find(String tabletServer) { + HostAndPort addr = AddressUtil.parseAddress(tabletServer, false); + for (Entry<String,TServerInfo> entry : current.entrySet()) { + if (entry.getValue().instance.getLocation().equals(addr)) + return entry.getValue().instance; + } + return null; + } + + public synchronized void remove(TServerInstance server) { + String zPath = null; + for (Entry<String,TServerInfo> entry : current.entrySet()) { + if (entry.getValue().instance.equals(server)) { + zPath = entry.getKey(); + break; + } + } + if (zPath == null) + return; + current.remove(zPath); + currentInstances.remove(server); + + log.info("Removing zookeeper lock for " + server); + String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath; + try { + ZooReaderWriter.getInstance().recursiveDelete(fullpath, SKIP); + } catch (Exception e) { + String msg = "error removing tablet server lock"; + log.fatal(msg, e); + Halt.halt(msg, -1); + } + getZooCache().clear(fullpath); + } +}