http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java ---------------------------------------------------------------------- diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 5385657,0000000..a90cc61 mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@@ -1,825 -1,0 +1,825 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor; + +import static com.google.common.base.Charsets.UTF_8; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.gc.thrift.GCMonitorService; +import org.apache.accumulo.core.gc.thrift.GCStatus; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; - import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.monitor.servlets.DefaultServlet; +import org.apache.accumulo.monitor.servlets.GcStatusServlet; +import org.apache.accumulo.monitor.servlets.JSONServlet; +import org.apache.accumulo.monitor.servlets.LogServlet; +import org.apache.accumulo.monitor.servlets.MasterServlet; +import org.apache.accumulo.monitor.servlets.OperationServlet; +import org.apache.accumulo.monitor.servlets.ProblemServlet; +import org.apache.accumulo.monitor.servlets.ScanServlet; +import org.apache.accumulo.monitor.servlets.ShellServlet; +import org.apache.accumulo.monitor.servlets.TServersServlet; +import org.apache.accumulo.monitor.servlets.TablesServlet; +import org.apache.accumulo.monitor.servlets.VisServlet; +import org.apache.accumulo.monitor.servlets.XMLServlet; +import org.apache.accumulo.monitor.servlets.trace.ListType; +import org.apache.accumulo.monitor.servlets.trace.ShowTrace; +import org.apache.accumulo.monitor.servlets.trace.Summary; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.monitor.LogService; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.server.security.SystemCredentials; ++import org.apache.accumulo.server.security.SecurityUtil; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.util.TableInfoUtil; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; + +import com.google.common.net.HostAndPort; + +/** + * Serve master statistics with an embedded web server. + */ +public class Monitor { + private static final Logger log = Logger.getLogger(Monitor.class); + + private static final int REFRESH_TIME = 5; + private static long lastRecalc = 0L; + private static double totalIngestRate = 0.0; + private static double totalIngestByteRate = 0.0; + private static double totalQueryRate = 0.0; + private static double totalScanRate = 0.0; + private static double totalQueryByteRate = 0.0; + private static long totalEntries = 0L; + private static int totalTabletCount = 0; + private static int onlineTabletCount = 0; + private static long totalHoldTime = 0; + private static long totalLookups = 0; + private static int totalTables = 0; + + private static class MaxList<T> extends LinkedList<Pair<Long,T>> { + private static final long serialVersionUID = 1L; + + private long maxDelta; + + public MaxList(long maxDelta) { + this.maxDelta = maxDelta; + } + + @Override + public boolean add(Pair<Long,T> obj) { + boolean result = super.add(obj); + + if (obj.getFirst() - get(0).getFirst() > maxDelta) + remove(0); + + return result; + } + + } + + private static final int MAX_TIME_PERIOD = 60 * 60 * 1000; + private static final List<Pair<Long,Double>> loadOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Double>> ingestRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Double>> ingestByteRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Integer>> recoveriesOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Integer>> minorCompactionsOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Integer>> majorCompactionsOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Double>> lookupsOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Integer>> queryRateOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Integer>> scanRateOverTime = Collections.synchronizedList(new MaxList<Integer>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Double>> queryByteRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Double>> indexCacheHitRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD)); + private static final List<Pair<Long,Double>> dataCacheHitRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD)); + private static EventCounter lookupRateTracker = new EventCounter(); + private static EventCounter indexCacheHitTracker = new EventCounter(); + private static EventCounter indexCacheRequestTracker = new EventCounter(); + private static EventCounter dataCacheHitTracker = new EventCounter(); + private static EventCounter dataCacheRequestTracker = new EventCounter(); + + private static volatile boolean fetching = false; + private static MasterMonitorInfo mmi; + private static Map<String,Map<ProblemType,Integer>> problemSummary = Collections.emptyMap(); + private static Exception problemException; + private static GCStatus gcStatus; + + private static Instance instance; + + private static ServerConfiguration config; + + private static EmbeddedWebServer server; + + private ZooLock monitorLock; + + private static class EventCounter { + + Map<String,Pair<Long,Long>> prevSamples = new HashMap<String,Pair<Long,Long>>(); + Map<String,Pair<Long,Long>> samples = new HashMap<String,Pair<Long,Long>>(); + Set<String> serversUpdated = new HashSet<String>(); + + void startingUpdates() { + serversUpdated.clear(); + } + + void updateTabletServer(String name, long sampleTime, long numEvents) { + Pair<Long,Long> newSample = new Pair<Long,Long>(sampleTime, numEvents); + Pair<Long,Long> lastSample = samples.get(name); + + if (lastSample == null || !lastSample.equals(newSample)) { + samples.put(name, newSample); + if (lastSample != null) { + prevSamples.put(name, lastSample); + } + } + serversUpdated.add(name); + } + + void finishedUpdating() { + // remove any tablet servers not updated + samples.keySet().retainAll(serversUpdated); + prevSamples.keySet().retainAll(serversUpdated); + } + + double calculateRate() { + double totalRate = 0; + + for (Entry<String,Pair<Long,Long>> entry : prevSamples.entrySet()) { + Pair<Long,Long> prevSample = entry.getValue(); + Pair<Long,Long> sample = samples.get(entry.getKey()); + + totalRate += (sample.getSecond() - prevSample.getSecond()) / ((sample.getFirst() - prevSample.getFirst()) / (double) 1000); + } + + return totalRate; + } + + long calculateCount() { + long count = 0; + + for (Entry<String,Pair<Long,Long>> entry : prevSamples.entrySet()) { + Pair<Long,Long> prevSample = entry.getValue(); + Pair<Long,Long> sample = samples.get(entry.getKey()); + + count += sample.getSecond() - prevSample.getSecond(); + } + + return count; + } + } + + public static void fetchData() { + double totalIngestRate = 0.; + double totalIngestByteRate = 0.; + double totalQueryRate = 0.; + double totalQueryByteRate = 0.; + double totalScanRate = 0.; + long totalEntries = 0; + int totalTabletCount = 0; + int onlineTabletCount = 0; + long totalHoldTime = 0; + long totalLookups = 0; + boolean retry = true; + + // only recalc every so often + long currentTime = System.currentTimeMillis(); + if (currentTime - lastRecalc < REFRESH_TIME * 1000) + return; + + synchronized (Monitor.class) { + if (fetching) + return; + fetching = true; + } + + try { + while (retry) { + MasterClientService.Iface client = null; + try { + client = MasterClient.getConnection(HdfsZooInstance.getInstance()); + if (client != null) { + mmi = client.getMasterStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(HdfsZooInstance.getInstance())); + retry = false; + } else { + mmi = null; + } + Monitor.gcStatus = fetchGcStatus(); + } catch (Exception e) { + mmi = null; + log.info("Error fetching stats: " + e); + } finally { + if (client != null) { + MasterClient.close(client); + } + } + if (mmi == null) + UtilWaitThread.sleep(1000); + } + if (mmi != null) { + int majorCompactions = 0; + int minorCompactions = 0; + + lookupRateTracker.startingUpdates(); + indexCacheHitTracker.startingUpdates(); + indexCacheRequestTracker.startingUpdates(); + dataCacheHitTracker.startingUpdates(); + dataCacheRequestTracker.startingUpdates(); + + for (TabletServerStatus server : mmi.tServerInfo) { + TableInfo summary = TableInfoUtil.summarizeTableStats(server); + totalIngestRate += summary.ingestRate; + totalIngestByteRate += summary.ingestByteRate; + totalQueryRate += summary.queryRate; + totalScanRate += summary.scanRate; + totalQueryByteRate += summary.queryByteRate; + totalEntries += summary.recs; + totalHoldTime += server.holdTime; + totalLookups += server.lookups; + majorCompactions += summary.majors.running; + minorCompactions += summary.minors.running; + lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups); + indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits); + indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest); + dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits); + dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest); + } + + lookupRateTracker.finishedUpdating(); + indexCacheHitTracker.finishedUpdating(); + indexCacheRequestTracker.finishedUpdating(); + dataCacheHitTracker.finishedUpdating(); + dataCacheRequestTracker.finishedUpdating(); + + int totalTables = 0; + for (TableInfo tInfo : mmi.tableMap.values()) { + totalTabletCount += tInfo.tablets; + onlineTabletCount += tInfo.onlineTablets; + totalTables++; + } + Monitor.totalIngestRate = totalIngestRate; + Monitor.totalTables = totalTables; + totalIngestByteRate = totalIngestByteRate / 1000000.0; + Monitor.totalIngestByteRate = totalIngestByteRate; + Monitor.totalQueryRate = totalQueryRate; + Monitor.totalScanRate = totalScanRate; + totalQueryByteRate = totalQueryByteRate / 1000000.0; + Monitor.totalQueryByteRate = totalQueryByteRate; + Monitor.totalEntries = totalEntries; + Monitor.totalTabletCount = totalTabletCount; + Monitor.onlineTabletCount = onlineTabletCount; + Monitor.totalHoldTime = totalHoldTime; + Monitor.totalLookups = totalLookups; + + ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate)); + ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate)); + + double totalLoad = 0.; + for (TabletServerStatus status : mmi.tServerInfo) { + if (status != null) + totalLoad += status.osLoad; + } + loadOverTime.add(new Pair<Long,Double>(currentTime, totalLoad)); + + minorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, minorCompactions)); + majorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, majorCompactions)); + + lookupsOverTime.add(new Pair<Long,Double>(currentTime, lookupRateTracker.calculateRate())); + + queryRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalQueryRate)); + queryByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalQueryByteRate)); + + scanRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalScanRate)); + + calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker); + calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker); + } + try { + Monitor.problemSummary = ProblemReports.getInstance().summarize(); + Monitor.problemException = null; + } catch (Exception e) { + log.info("Failed to obtain problem reports ", e); + Monitor.problemSummary = Collections.emptyMap(); + Monitor.problemException = e; + } + + } finally { + synchronized (Monitor.class) { + fetching = false; + lastRecalc = currentTime; + } + } + } + + private static void calcCacheHitRate(List<Pair<Long,Double>> hitRate, long currentTime, EventCounter cacheHits, EventCounter cacheReq) { + long req = cacheReq.calculateCount(); + if (req > 0) + hitRate.add(new Pair<Long,Double>(currentTime, cacheHits.calculateCount() / (double) cacheReq.calculateCount())); + else + hitRate.add(new Pair<Long,Double>(currentTime, null)); + } + + private static GCStatus fetchGcStatus() { + GCStatus result = null; + HostAndPort address = null; + try { + // Read the gc location from its lock + ZooReaderWriter zk = ZooReaderWriter.getInstance(); + String path = ZooUtil.getRoot(instance) + Constants.ZGC_LOCK; + List<String> locks = zk.getChildren(path, null); + if (locks != null && locks.size() > 0) { + Collections.sort(locks); + address = new ServerServices(new String(zk.getData(path + "/" + locks.get(0), null), UTF_8)).getAddress(Service.GC_CLIENT); + GCMonitorService.Client client = ThriftUtil.getClient(new GCMonitorService.Client.Factory(), address, config.getConfiguration()); + try { + result = client.getStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance)); + } finally { + ThriftUtil.returnClient(client); + } + } + } catch (Exception ex) { + log.warn("Unable to contact the garbage collector at " + address, ex); + } + return result; + } + + public static void main(String[] args) throws Exception { + SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration()); + + ServerOpts opts = new ServerOpts(); + final String app = "monitor"; + opts.parseArgs(app, args); + String hostname = opts.getAddress(); + + Accumulo.setupLogging(app); + VolumeManager fs = VolumeManagerImpl.get(); + instance = HdfsZooInstance.getInstance(); + config = new ServerConfiguration(instance); + Accumulo.init(fs, config, app); + Monitor monitor = new Monitor(); + Accumulo.enableTracing(hostname, app); + monitor.run(hostname); + } + + private static long START_TIME; + + public void run(String hostname) { + try { + getMonitorLock(); + } catch (Exception e) { + log.error("Failed to get Monitor ZooKeeper lock"); + throw new RuntimeException(e); + } + + Monitor.START_TIME = System.currentTimeMillis(); + int port = config.getConfiguration().getPort(Property.MONITOR_PORT); + try { + log.debug("Creating monitor on port " + port); + server = new EmbeddedWebServer(hostname, port); + } catch (Throwable ex) { + log.error("Unable to start embedded web server", ex); + throw new RuntimeException(ex); + } + + server.addServlet(DefaultServlet.class, "/"); + server.addServlet(OperationServlet.class, "/op"); + server.addServlet(MasterServlet.class, "/master"); + server.addServlet(TablesServlet.class, "/tables"); + server.addServlet(TServersServlet.class, "/tservers"); + server.addServlet(ProblemServlet.class, "/problems"); + server.addServlet(GcStatusServlet.class, "/gc"); + server.addServlet(LogServlet.class, "/log"); + server.addServlet(XMLServlet.class, "/xml"); + server.addServlet(JSONServlet.class, "/json"); + server.addServlet(VisServlet.class, "/vis"); + server.addServlet(ScanServlet.class, "/scans"); + server.addServlet(Summary.class, "/trace/summary"); + server.addServlet(ListType.class, "/trace/listType"); + server.addServlet(ShowTrace.class, "/trace/show"); + if (server.isUsingSsl()) + server.addServlet(ShellServlet.class, "/shell"); + server.start(); + + try { + hostname = InetAddress.getLocalHost().getHostName(); + + log.debug("Using " + hostname + " to advertise monitor location in ZooKeeper"); + + String monitorAddress = HostAndPort.fromParts(hostname, server.getPort()).toString(); + + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR_HTTP_ADDR, monitorAddress.getBytes(UTF_8), + NodeExistsPolicy.OVERWRITE); + log.info("Set monitor address in zookeeper to " + monitorAddress); + } catch (Exception ex) { + log.error("Unable to set monitor HTTP address in zookeeper", ex); + } + + if (null != hostname) { + LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID(), hostname); + } else { + log.warn("Not starting log4j listener as we could not determine address to use"); + } + + new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), "ZooKeeperStatus").start(); + + // need to regularly fetch data so plot data is updated + new Daemon(new LoggingRunnable(log, new Runnable() { + + @Override + public void run() { + while (true) { + try { + Monitor.fetchData(); + } catch (Exception e) { + log.warn(e.getMessage(), e); + } + + UtilWaitThread.sleep(333); + } + + } + }), "Data fetcher").start(); + + new Daemon(new LoggingRunnable(log, new Runnable() { + @Override + public void run() { + while (true) { + try { + Monitor.fetchScans(); + } catch (Exception e) { + log.warn(e.getMessage(), e); + } + UtilWaitThread.sleep(5000); + } + } + }), "Scan scanner").start(); + } + + public static class ScanStats { + public final long scanCount; + public final Long oldestScan; + public final long fetched; + ScanStats(List<ActiveScan> active) { + this.scanCount = active.size(); + long oldest = -1; + for (ActiveScan scan : active) { + oldest = Math.max(oldest, scan.age); + } + this.oldestScan = oldest < 0 ? null : oldest; + this.fetched = System.currentTimeMillis(); + } + } + static final Map<String, ScanStats> allScans = new HashMap<String, ScanStats>(); + public static Map<String, ScanStats> getScans() { + synchronized (allScans) { + return new TreeMap<String, ScanStats>(allScans); + } + } + + protected static void fetchScans() throws Exception { + if (instance == null) + return; + Connector c = instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()); + for (String server : c.instanceOperations().getTabletServers()) { + Client tserver = ThriftUtil.getTServerClient(server, Monitor.getSystemConfiguration()); + try { + List<ActiveScan> scans = tserver.getActiveScans(null, SystemCredentials.get().toThrift(instance)); + synchronized (allScans) { + allScans.put(server, new ScanStats(scans)); + } + } catch (Exception ex) { + log.debug(ex, ex); + } finally { + ThriftUtil.returnClient(tserver); + } + } + // Age off old scan information + Iterator<Entry<String,ScanStats>> entryIter = allScans.entrySet().iterator(); + long now = System.currentTimeMillis(); + while (entryIter.hasNext()) { + Entry<String,ScanStats> entry = entryIter.next(); + if (now - entry.getValue().fetched > 5 * 60 * 1000) { + entryIter.remove(); + } + } + } + + /** + * Get the monitor lock in ZooKeeper + */ + private void getMonitorLock() throws KeeperException, InterruptedException { + final String zRoot = ZooUtil.getRoot(instance); + final String monitorPath = zRoot + Constants.ZMONITOR; + final String monitorLockPath = zRoot + Constants.ZMONITOR_LOCK; + + // Ensure that everything is kosher with ZK as this has changed. + ZooReaderWriter zoo = ZooReaderWriter.getInstance(); + if (zoo.exists(monitorPath)) { + byte[] data = zoo.getData(monitorPath, null); + // If the node isn't empty, it's from a previous install (has hostname:port for HTTP server) + if (0 != data.length) { + // Recursively delete from that parent node + zoo.recursiveDelete(monitorPath, NodeMissingPolicy.SKIP); + + // And then make the nodes that we expect for the incoming ephemeral nodes + zoo.putPersistentData(monitorPath, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(monitorLockPath, new byte[0], NodeExistsPolicy.FAIL); + } else if (!zoo.exists(monitorLockPath)) { + // monitor node in ZK exists and is empty as we expect + // but the monitor/lock node does not + zoo.putPersistentData(monitorLockPath, new byte[0], NodeExistsPolicy.FAIL); + } + } else { + // 1.5.0 and earlier + zoo.putPersistentData(zRoot + Constants.ZMONITOR, new byte[0], NodeExistsPolicy.FAIL); + if (!zoo.exists(monitorLockPath)) { + // Somehow the monitor node exists but not monitor/lock + zoo.putPersistentData(monitorLockPath, new byte[0], NodeExistsPolicy.FAIL); + } + } + + // Get a ZooLock for the monitor + while (true) { + MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); + monitorLock = new ZooLock(monitorLockPath); + monitorLock.lockAsync(monitorLockWatcher, new byte[0]); + + monitorLockWatcher.waitForChange(); + + if (monitorLockWatcher.acquiredLock) { + break; + } + + if (!monitorLockWatcher.failedToAcquireLock) { + throw new IllegalStateException("monitor lock in unknown state"); + } + + monitorLock.tryToCancelAsyncLockOrUnlock(); + + UtilWaitThread.sleep(getSystemConfiguration().getTimeInMillis(Property.MONITOR_LOCK_CHECK_INTERVAL)); + } + + log.info("Got Monitor lock."); + } + + /** + * Async Watcher for monitor lock + */ + private static class MoniterLockWatcher implements ZooLock.AsyncLockWatcher { + + boolean acquiredLock = false; + boolean failedToAcquireLock = false; + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Throwable e) { + Halt.halt(-1, new Runnable() { + @Override + public void run() { + log.fatal("No longer able to monitor Monitor lock node", e); + } + }); + + } + + @Override + public synchronized void acquiredLock() { + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + log.warn("Failed to get monitor lock " + e); + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + wait(); + } catch (InterruptedException e) {} + } + } + } + + public static MasterMonitorInfo getMmi() { + return mmi; + } + + public static int getTotalTables() { + return totalTables; + } + + public static int getTotalTabletCount() { + return totalTabletCount; + } + + public static int getOnlineTabletCount() { + return onlineTabletCount; + } + + public static long getTotalEntries() { + return totalEntries; + } + + public static double getTotalIngestRate() { + return totalIngestRate; + } + + public static double getTotalIngestByteRate() { + return totalIngestByteRate; + } + + public static double getTotalQueryRate() { + return totalQueryRate; + } + + public static double getTotalScanRate() { + return totalScanRate; + } + + public static double getTotalQueryByteRate() { + return totalQueryByteRate; + } + + public static long getTotalHoldTime() { + return totalHoldTime; + } + + public static Exception getProblemException() { + return problemException; + } + + public static Map<String,Map<ProblemType,Integer>> getProblemSummary() { + return problemSummary; + } + + public static GCStatus getGcStatus() { + return gcStatus; + } + + public static long getTotalLookups() { + return totalLookups; + } + + public static long getStartTime() { + return START_TIME; + } + + public static List<Pair<Long,Double>> getLoadOverTime() { + synchronized (loadOverTime) { + return new ArrayList<Pair<Long,Double>>(loadOverTime); + } + } + + public static List<Pair<Long,Double>> getIngestRateOverTime() { + synchronized (ingestRateOverTime) { + return new ArrayList<Pair<Long,Double>>(ingestRateOverTime); + } + } + + public static List<Pair<Long,Double>> getIngestByteRateOverTime() { + synchronized (ingestByteRateOverTime) { + return new ArrayList<Pair<Long,Double>>(ingestByteRateOverTime); + } + } + + public static List<Pair<Long,Integer>> getRecoveriesOverTime() { + synchronized (recoveriesOverTime) { + return new ArrayList<Pair<Long,Integer>>(recoveriesOverTime); + } + } + + public static List<Pair<Long,Integer>> getMinorCompactionsOverTime() { + synchronized (minorCompactionsOverTime) { + return new ArrayList<Pair<Long,Integer>>(minorCompactionsOverTime); + } + } + + public static List<Pair<Long,Integer>> getMajorCompactionsOverTime() { + synchronized (majorCompactionsOverTime) { + return new ArrayList<Pair<Long,Integer>>(majorCompactionsOverTime); + } + } + + public static List<Pair<Long,Double>> getLookupsOverTime() { + synchronized (lookupsOverTime) { + return new ArrayList<Pair<Long,Double>>(lookupsOverTime); + } + } + + public static double getLookupRate() { + return lookupRateTracker.calculateRate(); + } + + public static List<Pair<Long,Integer>> getQueryRateOverTime() { + synchronized (queryRateOverTime) { + return new ArrayList<Pair<Long,Integer>>(queryRateOverTime); + } + } + + public static List<Pair<Long,Integer>> getScanRateOverTime() { + synchronized (scanRateOverTime) { + return new ArrayList<Pair<Long,Integer>>(scanRateOverTime); + } + } + + public static List<Pair<Long,Double>> getQueryByteRateOverTime() { + synchronized (queryByteRateOverTime) { + return new ArrayList<Pair<Long,Double>>(queryByteRateOverTime); + } + } + + public static List<Pair<Long,Double>> getIndexCacheHitRateOverTime() { + synchronized (indexCacheHitRateOverTime) { + return new ArrayList<Pair<Long,Double>>(indexCacheHitRateOverTime); + } + } + + public static List<Pair<Long,Double>> getDataCacheHitRateOverTime() { + synchronized (dataCacheHitRateOverTime) { + return new ArrayList<Pair<Long,Double>>(dataCacheHitRateOverTime); + } + } + + public static AccumuloConfiguration getSystemConfiguration() { + return config.getConfiguration(); + } + + public static Instance getInstance() { + return instance; + } + + public static boolean isUsingSsl() { + return server.isUsingSsl(); + } +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e5064e3/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java ---------------------------------------------------------------------- diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index 70e8d7d,0000000..412f551 mode 100644,000000..100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@@ -1,331 -1,0 +1,331 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tracer; + +import static com.google.common.base.Charsets.UTF_8; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.ServerSocketChannel; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.AgeOffFilter; - import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.trace.TraceFormatter; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; ++import org.apache.accumulo.server.security.SecurityUtil; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.thrift.RemoteSpan; +import org.apache.accumulo.trace.thrift.SpanReceiver.Iface; +import org.apache.accumulo.trace.thrift.SpanReceiver.Processor; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; + +public class TraceServer implements Watcher { + + final private static Logger log = Logger.getLogger(TraceServer.class); + final private ServerConfiguration serverConfiguration; + final private TServer server; + final private AtomicReference<BatchWriter> writer; + final private Connector connector; + final String table; + + private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) { + m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len)); + } + + static class ByteArrayTransport extends TTransport { + TByteArrayOutputStream out = new TByteArrayOutputStream(); + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void open() throws TTransportException {} + + @Override + public void close() {} + + @Override + public int read(byte[] buf, int off, int len) { + return 0; + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + out.write(buf, off, len); + } + + public byte[] get() { + return out.get(); + } + + public int len() { + return out.len(); + } + } + + class Receiver implements Iface { + @Override + public void span(RemoteSpan s) throws TException { + String idString = Long.toHexString(s.traceId); + String startString = Long.toHexString(s.start); + Mutation spanMutation = new Mutation(new Text(idString)); + Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString)); + long diff = s.stop - s.start; + indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes(UTF_8))); + ByteArrayTransport transport = new ByteArrayTransport(); + TCompactProtocol protocol = new TCompactProtocol(transport); + s.write(protocol); + String parentString = Long.toHexString(s.parentId); + if (s.parentId == Span.ROOT_SPAN_ID) + parentString = ""; + put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len()); + // Map the root span to time so we can look up traces by time + Mutation timeMutation = null; + if (s.parentId == Span.ROOT_SPAN_ID) { + timeMutation = new Mutation(new Text("start:" + startString)); + put(timeMutation, "id", idString, transport.get(), transport.len()); + } + try { + final BatchWriter writer = TraceServer.this.writer.get(); + /* + * Check for null, because we expect spans to come in much faster than flush calls. In the case of failure, we'd rather avoid logging tons of NPEs. + */ + if (null == writer) { + log.warn("writer is not ready; discarding span."); + return; + } + writer.addMutation(spanMutation); + writer.addMutation(indexMutation); + if (timeMutation != null) + writer.addMutation(timeMutation); + } catch (MutationsRejectedException exception) { + log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + exception); + if (log.isDebugEnabled()) { + log.debug("discarded span due to rejection of mutation: " + spanMutation, exception); + } + /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */ + } catch (RuntimeException exception) { + log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception); + log.debug("unable to write mutation to table due to exception.", exception); + } + } + + } + + public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception { + this.serverConfiguration = serverConfiguration; + AccumuloConfiguration conf = serverConfiguration.getConfiguration(); + table = conf.get(Property.TRACE_TABLE); + Connector connector = null; + while (true) { + try { + String principal = conf.get(Property.TRACE_USER); + AuthenticationToken at; + Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX); + if (loginMap.isEmpty()) { + Property p = Property.TRACE_PASSWORD; + at = new PasswordToken(conf.get(p).getBytes(UTF_8)); + } else { + Properties props = new Properties(); + AuthenticationToken token = AccumuloVFSClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class) + .newInstance(); + + int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length(); + for (Entry<String,String> entry : loginMap.entrySet()) { + props.put(entry.getKey().substring(prefixLength), entry.getValue()); + } + + token.init(props); + + at = token; + } + + connector = serverConfiguration.getInstance().getConnector(principal, at); + if (!connector.tableOperations().exists(table)) { + connector.tableOperations().create(table); + IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName()); + AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l); + connector.tableOperations().attachIterator(table, setting); + } + connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName()); + break; + } catch (Exception ex) { + log.info("Waiting to checking/create the trace table.", ex); + UtilWaitThread.sleep(1000); + } + } + this.connector = connector; + // make sure we refer to the final variable from now on. + connector = null; + + int port = conf.getPort(Property.TRACE_PORT); + final ServerSocket sock = ServerSocketChannel.open().socket(); + sock.setReuseAddress(true); + sock.bind(new InetSocketAddress(hostname, port)); + final TServerTransport transport = new TServerSocket(sock); + TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); + options.processor(new Processor<Iface>(new Receiver())); + server = new TThreadPoolServer(options); + registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort()); + writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS))); + } + + public void run() throws Exception { + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + flush(); + } + }, 1000, 1000); + server.serve(); + } + + private void flush() { + try { + final BatchWriter writer = this.writer.get(); + if (null != writer) { + writer.flush(); + } else { + // We don't have a writer. If the table exists, try to make a new writer. + if (connector.tableOperations().exists(table)) { + resetWriter(); + } + } + } catch (MutationsRejectedException exception) { + log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception); + log.debug("flushing traces failed due to exception", exception); + resetWriter(); + /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */ + } catch (RuntimeException exception) { + log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception); + log.debug("flushing traces failed due to exception", exception); + resetWriter(); + } + } + + private void resetWriter() { + BatchWriter writer = null; + try { + writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS)); + } catch (Exception ex) { + log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex); + log.debug("batch writer creation failed with exception.", ex); + } finally { + /* Trade in the new writer (even if null) for the one we need to close. */ + writer = this.writer.getAndSet(writer); + try { + if (null != writer) { + writer.close(); + } + } catch (Exception ex) { + log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex); + log.debug("batch writer close failed with exception", ex); + } + } + } + + private void registerInZooKeeper(String name) throws Exception { + String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS; + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(UTF_8)); + zoo.exists(path, this); + } + + public static void main(String[] args) throws Exception { + SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration()); + ServerOpts opts = new ServerOpts(); + final String app = "tracer"; + opts.parseArgs(app, args); + Accumulo.setupLogging(app); + Instance instance = HdfsZooInstance.getInstance(); + ServerConfiguration conf = new ServerConfiguration(instance); + VolumeManager fs = VolumeManagerImpl.get(); + Accumulo.init(fs, conf, app); + String hostname = opts.getAddress(); + TraceServer server = new TraceServer(conf, hostname); + Accumulo.enableTracing(hostname, app); + server.run(); + log.info("tracer stopping"); + } + + @Override + public void process(WatchedEvent event) { + log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState()); + if (event.getState() == KeeperState.Expired) { + log.warn("Trace server lost zookeeper registration at " + event.getPath()); + server.stop(); + } else if (event.getType() == EventType.NodeDeleted) { + log.warn("Trace server zookeeper entry lost " + event.getPath()); + server.stop(); + } + if (event.getPath() != null) { + try { + if (ZooReaderWriter.getInstance().exists(event.getPath(), this)) + return; + } catch (Exception ex) { + log.error(ex, ex); + } + log.warn("Trace server unable to reset watch on zookeeper registration"); + server.stop(); + } + } + +}