Merge branch '1.6' into 1.7

Conflicts:
        
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
        server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
        
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
        
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
        
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/01cdd020
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/01cdd020
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/01cdd020

Branch: refs/heads/master
Commit: 01cdd0205657a61d59a80294e0f22aa010fdf966
Parents: acf68ad e1e4100
Author: Josh Elser <els...@apache.org>
Authored: Thu Dec 31 00:08:46 2015 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Thu Dec 31 00:08:46 2015 -0500

----------------------------------------------------------------------
 .../accumulo/core/client/BatchWriter.java       |  5 ++++-
 .../file/blockfile/cache/LruBlockCache.java     | 22 +++++++++-----------
 pom.xml                                         |  2 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |  6 +++++-
 .../org/apache/accumulo/monitor/Monitor.java    |  2 ++
 .../org/apache/accumulo/tracer/TraceServer.java |  3 +++
 .../apache/accumulo/tserver/TabletServer.java   |  2 ++
 .../accumulo/tserver/tablet/Compactor.java      | 11 ++++++----
 .../accumulo/tserver/tablet/MinorCompactor.java | 19 +++++++++--------
 9 files changed, 44 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 6bb8f9a,b98daf7..48633cb
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -138,11 -133,16 +139,14 @@@ public class SimpleGarbageCollector ext
  
    private GCStatus status = new GCStatus(new GcCycleStats(), new 
GcCycleStats(), new GcCycleStats(), new GcCycleStats());
  
 -  private Instance instance;
 -
    public static void main(String[] args) throws UnknownHostException, 
IOException {
 -    SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration());
 +    SecurityUtil.serverLogin(SiteConfiguration.getInstance());
      final String app = "gc";
      Accumulo.setupLogging(app);
-     ServerConfigurationFactory conf = new 
ServerConfigurationFactory(HdfsZooInstance.getInstance());
+     Instance instance = HdfsZooInstance.getInstance();
++    ServerConfigurationFactory conf = new 
ServerConfigurationFactory(instance);
+     log.info("Version " + Constants.VERSION);
+     log.info("Instance " + instance.getInstanceID());
 -    ServerConfiguration conf = new ServerConfiguration(instance);
      final VolumeManager fs = VolumeManagerImpl.get();
      Accumulo.init(fs, conf, app);
      Opts opts = new Opts();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
----------------------------------------------------------------------
diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 24e2e3f,4e162c3..c3dd773
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@@ -427,16 -432,13 +427,18 @@@ public class Monitor 
      Accumulo.setupLogging(app);
      VolumeManager fs = VolumeManagerImpl.get();
      instance = HdfsZooInstance.getInstance();
 -    config = new ServerConfiguration(instance);
 +    config = new ServerConfigurationFactory(instance);
 +    context = new AccumuloServerContext(config);
+     log.info("Version " + Constants.VERSION);
+     log.info("Instance " + instance.getInstanceID());
      Accumulo.init(fs, config, app);
      Monitor monitor = new Monitor();
 -    Accumulo.enableTracing(hostname, app);
 -    monitor.run(hostname);
 +    DistributedTrace.enable(hostname, app, config.getConfiguration());
 +    try {
 +      monitor.run(hostname);
 +    } finally {
 +      DistributedTrace.disable();
 +    }
    }
  
    private static long START_TIME;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc 
server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 7511f3c,1df10a3..2d5d68d
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -175,8 -170,10 +176,10 @@@ public class TraceServer implements Wat
  
    }
  
 -  public TraceServer(ServerConfiguration serverConfiguration, String 
hostname) throws Exception {
 +  public TraceServer(ServerConfigurationFactory serverConfiguration, String 
hostname) throws Exception {
      this.serverConfiguration = serverConfiguration;
+     log.info("Version " + Constants.VERSION);
+     log.info("Instance " + serverConfiguration.getInstance().getInstanceID());
      AccumuloConfiguration conf = serverConfiguration.getConfiguration();
      table = conf.get(Property.TRACE_TABLE);
      Connector connector = null;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 1080d8d,29cf0d3..1e0d119
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -256,149 -255,734 +256,151 @@@ public class TabletServer extends Accum
    private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
    private static final long TIME_BETWEEN_GC_CHECKS = 5000;
    private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 
1000;
 +  private static final Set<Column> EMPTY_COLUMNS = Collections.emptySet();
  
 -  private TabletServerLogger logger;
 -
 -  protected TabletServerMinCMetrics mincMetrics = new 
TabletServerMinCMetrics();
 -
 -  private ServerConfiguration serverConfig;
 -  private LogSorter logSorter = null;
 -
 -  public TabletServer(ServerConfiguration conf, VolumeManager fs) {
 -    super();
 -    this.serverConfig = conf;
 -    this.instance = conf.getInstance();
 -    this.fs = fs;
 -
 -    log.info("Version " + Constants.VERSION);
 -    log.info("Instance " + instance.getInstanceID());
 -
 -    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
 -    SimpleTimer.getInstance().schedule(new Runnable() {
 -      @Override
 -      public void run() {
 -        synchronized (onlineTablets) {
 -          long now = System.currentTimeMillis();
 -          for (Tablet tablet : onlineTablets.values())
 -            try {
 -              tablet.updateRates(now);
 -            } catch (Exception ex) {
 -              log.error(ex, ex);
 -            }
 -        }
 -      }
 -    }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS);
 -    SimpleTimer.getInstance().schedule(new Runnable() {
 -      @Override
 -      public void run() {
 -        TabletLocator.clearLocators();
 -      }
 -    }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), 
jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
 -  }
 -
 -  private static long jitter(long ms) {
 -    Random r = new Random();
 -    // add a random 10% wait
 -    return (long) ((1. + (r.nextDouble() / 10)) * ms);
 -  }
 -
 -  private synchronized static void logGCInfo(AccumuloConfiguration conf) {
 -    long now = System.currentTimeMillis();
 -
 -    List<GarbageCollectorMXBean> gcmBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
 -    Runtime rt = Runtime.getRuntime();
 -
 -    StringBuilder sb = new StringBuilder("gc");
 -
 -    boolean sawChange = false;
 -
 -    long maxIncreaseInCollectionTime = 0;
 -
 -    for (GarbageCollectorMXBean gcBean : gcmBeans) {
 -      Long prevTime = prevGcTime.get(gcBean.getName());
 -      long pt = 0;
 -      if (prevTime != null) {
 -        pt = prevTime;
 -      }
 -
 -      long time = gcBean.getCollectionTime();
 -
 -      if (time - pt != 0) {
 -        sawChange = true;
 -      }
 -
 -      long increaseInCollectionTime = time - pt;
 -      sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), 
time / 1000.0, increaseInCollectionTime / 1000.0));
 -      maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, 
maxIncreaseInCollectionTime);
 -      prevGcTime.put(gcBean.getName(), time);
 -    }
 -
 -    long mem = rt.freeMemory();
 -    if (maxIncreaseInCollectionTime == 0) {
 -      gcTimeIncreasedCount = 0;
 -    } else {
 -      gcTimeIncreasedCount++;
 -      if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
 -        log.warn("Running low on memory");
 -        gcTimeIncreasedCount = 0;
 -      }
 -    }
 -
 -    if (mem > lastMemorySize) {
 -      sawChange = true;
 -    }
 +  private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
 +  private final TransactionWatcher watcher = new TransactionWatcher();
 +  private final ZooCache masterLockCache = new ZooCache();
  
 -    String sign = "+";
 -    if (mem - lastMemorySize <= 0) {
 -      sign = "";
 -    }
 -
 -    sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, 
(mem - lastMemorySize), rt.totalMemory()));
 +  private final TabletServerLogger logger;
  
 -    if (sawChange) {
 -      log.debug(sb.toString());
 -    }
 -
 -    final long keepAliveTimeout = 
conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
 -    if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
 -      long diff = now - lastMemoryCheckTime;
 -      if (diff > keepAliveTimeout + 1000) {
 -        log.warn(String.format("GC pause checker not called in a timely 
fashion. Expected every %.1f seconds but was %.1f seconds since last check",
 -            TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.));
 -      }
 -      lastMemoryCheckTime = now;
 -      return;
 -    }
 -
 -    if (maxIncreaseInCollectionTime > keepAliveTimeout) {
 -      Halt.halt("Garbage collection may be interfering with lock keep-alive.  
Halting.", -1);
 -    }
 +  private final TabletServerMetricsFactory metricsFactory;
 +  private final Metrics updateMetrics;
 +  private final Metrics scanMetrics;
 +  private final Metrics mincMetrics;
  
 -    lastMemorySize = mem;
 -    lastMemoryCheckTime = now;
 +  public Metrics getMinCMetrics() {
 +    return mincMetrics;
    }
  
 -  private TabletStatsKeeper statsKeeper;
 -
 -  private static class Session {
 -    long lastAccessTime;
 -    long startTime;
 -    String user;
 -    String client = TServerUtils.clientAddress.get();
 -    public boolean reserved;
 -
 -    public void cleanup() {}
 -  }
 -
 -  private static class SessionManager {
 -
 -    SecureRandom random;
 -    Map<Long,Session> sessions;
 -    long maxIdle;
 -
 -    SessionManager(AccumuloConfiguration conf) {
 -      random = new SecureRandom();
 -      sessions = new HashMap<Long,Session>();
 -
 -      maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 -
 -      Runnable r = new Runnable() {
 -        @Override
 -        public void run() {
 -          sweep(maxIdle);
 -        }
 -      };
 -
 -      SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
 -    }
 -
 -    synchronized long createSession(Session session, boolean reserve) {
 -      long sid = random.nextLong();
 -
 -      while (sessions.containsKey(sid)) {
 -        sid = random.nextLong();
 -      }
 -
 -      sessions.put(sid, session);
 -
 -      session.reserved = reserve;
 -
 -      session.startTime = session.lastAccessTime = System.currentTimeMillis();
 -
 -      return sid;
 -    }
 -
 -    long getMaxIdleTime() {
 -      return maxIdle;
 -    }
 -
 -    /**
 -     * while a session is reserved, it cannot be canceled or removed
 -     */
 -    synchronized Session reserveSession(long sessionId) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        if (session.reserved)
 -          throw new IllegalStateException();
 -        session.reserved = true;
 -      }
 -
 -      return session;
 -
 -    }
 -
 -    synchronized Session reserveSession(long sessionId, boolean wait) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        while (wait && session.reserved) {
 -          try {
 -            wait(1000);
 -          } catch (InterruptedException e) {
 -            throw new RuntimeException();
 -          }
 -        }
 -
 -        if (session.reserved)
 -          throw new IllegalStateException();
 -        session.reserved = true;
 -      }
 -
 -      return session;
 -
 -    }
 -
 -    synchronized void unreserveSession(Session session) {
 -      if (!session.reserved)
 -        throw new IllegalStateException();
 -      notifyAll();
 -      session.reserved = false;
 -      session.lastAccessTime = System.currentTimeMillis();
 -    }
 -
 -    synchronized void unreserveSession(long sessionId) {
 -      Session session = getSession(sessionId);
 -      if (session != null)
 -        unreserveSession(session);
 -    }
 -
 -    synchronized Session getSession(long sessionId) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null)
 -        session.lastAccessTime = System.currentTimeMillis();
 -      return session;
 -    }
 -
 -    Session removeSession(long sessionId) {
 -      return removeSession(sessionId, false);
 -    }
 -
 -    Session removeSession(long sessionId, boolean unreserve) {
 -      Session session = null;
 -      synchronized (this) {
 -        session = sessions.remove(sessionId);
 -        if (unreserve && session != null)
 -          unreserveSession(session);
 -      }
 -
 -      // do clean up out side of lock..
 -      if (session != null)
 -        session.cleanup();
 -
 -      return session;
 -    }
 -
 -    private void sweep(long maxIdle) {
 -      ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
 -      synchronized (this) {
 -        Iterator<Session> iter = sessions.values().iterator();
 -        while (iter.hasNext()) {
 -          Session session = iter.next();
 -          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
 -          if (idleTime > maxIdle && !session.reserved) {
 -            log.info("Closing idle session from user=" + session.user + ", 
client=" + session.client + ", idle=" + idleTime + "ms");
 -            iter.remove();
 -            sessionsToCleanup.add(session);
 -          }
 -        }
 -      }
 -
 -      // do clean up outside of lock
 -      for (Session session : sessionsToCleanup) {
 -        session.cleanup();
 -      }
 -    }
 -
 -    synchronized void removeIfNotAccessed(final long sessionId, final long 
delay) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        final long removeTime = session.lastAccessTime;
 -        TimerTask r = new TimerTask() {
 -          @Override
 -          public void run() {
 -            Session sessionToCleanup = null;
 -            synchronized (SessionManager.this) {
 -              Session session2 = sessions.get(sessionId);
 -              if (session2 != null && session2.lastAccessTime == removeTime 
&& !session2.reserved) {
 -                log.info("Closing not accessed session from user=" + 
session2.user + ", client=" + session2.client + ", duration=" + delay + "ms");
 -                sessions.remove(sessionId);
 -                sessionToCleanup = session2;
 -              }
 -            }
 -
 -            // call clean up outside of lock
 -            if (sessionToCleanup != null)
 -              sessionToCleanup.cleanup();
 -          }
 -        };
 -
 -        SimpleTimer.getInstance().schedule(r, delay);
 -      }
 -    }
 -
 -    public synchronized Map<String,MapCounter<ScanRunState>> 
getActiveScansPerTable() {
 -      Map<String,MapCounter<ScanRunState>> counts = new 
HashMap<String,MapCounter<ScanRunState>>();
 -      for (Entry<Long,Session> entry : sessions.entrySet()) {
 -
 -        Session session = entry.getValue();
 -        @SuppressWarnings("rawtypes")
 -        ScanTask nbt = null;
 -        String tableID = null;
 -
 -        if (session instanceof ScanSession) {
 -          ScanSession ss = (ScanSession) session;
 -          nbt = ss.nextBatchTask;
 -          tableID = ss.extent.getTableId().toString();
 -        } else if (session instanceof MultiScanSession) {
 -          MultiScanSession mss = (MultiScanSession) session;
 -          nbt = mss.lookupTask;
 -          tableID = mss.threadPoolExtent.getTableId().toString();
 -        }
 -
 -        if (nbt == null)
 -          continue;
 +  private final LogSorter logSorter;
 +  private ReplicationWorker replWorker = null;
 +  private final TabletStatsKeeper statsKeeper;
 +  private final AtomicInteger logIdGenerator = new AtomicInteger();
  
 -        ScanRunState srs = nbt.getScanRunState();
 +  private final AtomicLong flushCounter = new AtomicLong(0);
 +  private final AtomicLong syncCounter = new AtomicLong(0);
  
 -        if (srs == ScanRunState.FINISHED)
 -          continue;
 +  private final VolumeManager fs;
  
 -        MapCounter<ScanRunState> stateCounts = counts.get(tableID);
 -        if (stateCounts == null) {
 -          stateCounts = new MapCounter<ScanRunState>();
 -          counts.put(tableID, stateCounts);
 -        }
 +  private final SortedMap<KeyExtent,Tablet> onlineTablets = 
Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
 +  private final SortedSet<KeyExtent> unopenedTablets = 
Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
 +  private final SortedSet<KeyExtent> openingTablets = 
Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
 +  @SuppressWarnings("unchecked")
 +  private final Map<KeyExtent,Long> recentlyUnloadedCache = 
Collections.synchronizedMap(new LRUMap(1000));
  
 -        stateCounts.increment(srs, 1);
 -      }
 +  private final TabletServerResourceManager resourceManager;
 +  private final SecurityOperation security;
  
 -      return counts;
 -    }
 +  private final BlockingDeque<MasterMessage> masterMessages = new 
LinkedBlockingDeque<MasterMessage>();
  
 -    public synchronized List<ActiveScan> getActiveScans() {
 +  private Thread majorCompactorThread;
  
 -      ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>();
 +  private HostAndPort replicationAddress;
 +  private HostAndPort clientAddress;
  
 -      long ct = System.currentTimeMillis();
 +  private volatile boolean serverStopRequested = false;
 +  private volatile boolean majorCompactorDisabled = false;
 +  private volatile boolean shutdownComplete = false;
  
 -      for (Entry<Long,Session> entry : sessions.entrySet()) {
 -        Session session = entry.getValue();
 -        if (session instanceof ScanSession) {
 -          ScanSession ss = (ScanSession) session;
 +  private ZooLock tabletServerLock;
  
 -          ScanState state = ScanState.RUNNING;
 +  private TServer server;
 +  private TServer replServer;
  
 -          ScanTask<ScanBatch> nbt = ss.nextBatchTask;
 -          if (nbt == null) {
 -            state = ScanState.IDLE;
 -          } else {
 -            switch (nbt.getScanRunState()) {
 -              case QUEUED:
 -                state = ScanState.QUEUED;
 -                break;
 -              case FINISHED:
 -                state = ScanState.IDLE;
 -                break;
 -              case RUNNING:
 -              default:
 -                /* do nothing */
 -                break;
 -            }
 -          }
 +  private DistributedWorkQueue bulkFailedCopyQ;
  
 -          ActiveScan activeScan = new ActiveScan(ss.client, ss.user, 
ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime,
 -              ScanType.SINGLE, state, ss.extent.toThrift(), 
Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio,
 -              ss.auths.getAuthorizationsBB());
 +  private String lockID;
  
 -          // scanId added by ACCUMULO-2641 is an optional thrift argument and 
not available in ActiveScan constructor
 -          activeScan.setScanId(entry.getKey());
 -          activeScans.add(activeScan);
 +  public static final AtomicLong seekCount = new AtomicLong(0);
  
 -        } else if (session instanceof MultiScanSession) {
 -          MultiScanSession mss = (MultiScanSession) session;
 +  private final AtomicLong totalMinorCompactions = new AtomicLong(0);
 +  private final ServerConfigurationFactory confFactory;
  
 -          ScanState state = ScanState.RUNNING;
 +  private final ZooAuthenticationKeyWatcher authKeyWatcher;
  
 -          ScanTask<MultiScanResult> nbt = mss.lookupTask;
 -          if (nbt == null) {
 -            state = ScanState.IDLE;
 -          } else {
 -            switch (nbt.getScanRunState()) {
 -              case QUEUED:
 -                state = ScanState.QUEUED;
 -                break;
 -              case FINISHED:
 -                state = ScanState.IDLE;
 -                break;
 -              case RUNNING:
 -              default:
 -                /* do nothing */
 -                break;
 +  public TabletServer(ServerConfigurationFactory confFactory, VolumeManager 
fs) {
 +    super(confFactory);
 +    this.confFactory = confFactory;
 +    this.fs = fs;
 +    AccumuloConfiguration aconf = getConfiguration();
 +    Instance instance = getInstance();
++    log.info("Version " + Constants.VERSION);
++    log.info("Instance " + instance.getInstanceID());
 +    this.sessionManager = new SessionManager(aconf);
 +    this.logSorter = new LogSorter(instance, fs, aconf);
 +    this.replWorker = new ReplicationWorker(this, fs);
 +    this.statsKeeper = new TabletStatsKeeper();
 +    SimpleTimer.getInstance(aconf).schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        synchronized (onlineTablets) {
 +          long now = System.currentTimeMillis();
 +          for (Tablet tablet : onlineTablets.values())
 +            try {
 +              tablet.updateRates(now);
 +            } catch (Exception ex) {
 +              log.error("Error updating rates for {}", tablet.getExtent(), 
ex);
              }
 -          }
 -
 -          activeScans.add(new ActiveScan(mss.client, mss.user, 
mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - 
mss.lastAccessTime,
 -              ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), 
Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, 
mss.auths
 -                  .getAuthorizationsBB()));
          }
        }
 +    }, 5000, 5000);
  
 -      return activeScans;
 -    }
 -  }
 -
 -  static class TservConstraintEnv implements Environment {
 -
 -    private TCredentials credentials;
 -    private SecurityOperation security;
 -    private Authorizations auths;
 -    private KeyExtent ke;
 -
 -    TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
 -      this.security = secOp;
 -      this.credentials = credentials;
 -    }
 -
 -    void setExtent(KeyExtent ke) {
 -      this.ke = ke;
 -    }
 -
 -    @Override
 -    public KeyExtent getExtent() {
 -      return ke;
 -    }
 -
 -    @Override
 -    public String getUser() {
 -      return credentials.getPrincipal();
 -    }
 -
 -    @Override
 -    @Deprecated
 -    public Authorizations getAuthorizations() {
 -      if (auths == null)
 -        try {
 -          this.auths = security.getUserAuthorizations(credentials);
 -        } catch (ThriftSecurityException e) {
 -          throw new RuntimeException(e);
 -        }
 -      return auths;
 -    }
 -
 -    @Override
 -    public AuthorizationContainer getAuthorizationsContainer() {
 -      return new AuthorizationContainer() {
 -        @Override
 -        public boolean contains(ByteSequence auth) {
 -          try {
 -            return security.userHasAuthorizations(credentials,
 -                Collections.<ByteBuffer> 
singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), 
auth.length())));
 -          } catch (ThriftSecurityException e) {
 -            throw new RuntimeException(e);
 -          }
 -        }
 -      };
 -    }
 -  }
 -
 -  private abstract class ScanTask<T> implements RunnableFuture<T> {
 -
 -    protected AtomicBoolean interruptFlag;
 -    protected ArrayBlockingQueue<Object> resultQueue;
 -    protected AtomicInteger state;
 -    protected AtomicReference<ScanRunState> runState;
 -
 -    private static final int INITIAL = 1;
 -    private static final int ADDED = 2;
 -    private static final int CANCELED = 3;
 -
 -    ScanTask() {
 -      interruptFlag = new AtomicBoolean(false);
 -      runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
 -      state = new AtomicInteger(INITIAL);
 -      resultQueue = new ArrayBlockingQueue<Object>(1);
 -    }
 -
 -    protected void addResult(Object o) {
 -      if (state.compareAndSet(INITIAL, ADDED))
 -        resultQueue.add(o);
 -      else if (state.get() == ADDED)
 -        throw new IllegalStateException("Tried to add more than one result");
 -    }
 -
 -    @Override
 -    public boolean cancel(boolean mayInterruptIfRunning) {
 -      if (!mayInterruptIfRunning)
 -        throw new IllegalArgumentException("Cancel will always attempt to 
interupt running next batch task");
 -
 -      if (state.get() == CANCELED)
 -        return true;
 -
 -      if (state.compareAndSet(INITIAL, CANCELED)) {
 -        interruptFlag.set(true);
 -        resultQueue = null;
 -        return true;
 -      }
 -
 -      return false;
 -    }
 -
 -    @Override
 -    public T get() throws InterruptedException, ExecutionException {
 -      throw new UnsupportedOperationException();
 -    }
 -
 -    @SuppressWarnings("unchecked")
 -    @Override
 -    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
 -
 -      ArrayBlockingQueue<Object> localRQ = resultQueue;
 -
 -      if (state.get() == CANCELED)
 -        throw new CancellationException();
 -
 -      if (localRQ == null) {
 -        int st = state.get();
 -        String stateStr;
 -        switch (st) {
 -          case ADDED:
 -            stateStr = "ADDED";
 -            break;
 -          case CANCELED:
 -            stateStr = "CANCELED";
 -            break;
 -          case INITIAL:
 -            stateStr = "INITIAL";
 -            break;
 -          default:
 -            stateStr = "UNKNOWN";
 -            break;
 -        }
 -        throw new IllegalStateException("Tried to get result twice [state=" + 
stateStr + "(" + st + ")]");
 -      }
 -
 -      Object r = localRQ.poll(timeout, unit);
 -
 -      // could have been canceled while waiting
 -      if (state.get() == CANCELED) {
 -        if (r != null)
 -          throw new IllegalStateException("Nothing should have been added 
when in canceled state");
 +    final long walogMaxSize = 
aconf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
 +    final long minBlockSize = 
CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size",
 0);
 +    if (minBlockSize != 0 && minBlockSize > walogMaxSize)
 +      throw new RuntimeException("Unable to start TabletServer. Logger is set 
to use blocksize " + walogMaxSize + " but hdfs minimum block size is "
 +          + minBlockSize + ". Either increase the " + 
Property.TSERV_WALOG_MAX_SIZE + " or decrease 
dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
  
 -        throw new CancellationException();
 +    final long toleratedWalCreationFailures = 
aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES);
 +    final long walCreationFailureRetryIncrement = 
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
 +    final long walCreationFailureRetryMax = 
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
 +    // Tolerate `toleratedWalCreationFailures` failures, waiting 
`walCreationFailureRetryIncrement` milliseconds after the first failure,
 +    // incrementing the next wait period by the same value, for a maximum of 
`walCreationFailureRetryMax` retries.
 +    final RetryFactory walCreationRetryFactory = new 
RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement,
 +        walCreationFailureRetryIncrement, walCreationFailureRetryMax);
 +
 +    logger = new TabletServerLogger(this, walogMaxSize, syncCounter, 
flushCounter, walCreationRetryFactory);
 +    this.resourceManager = new TabletServerResourceManager(this, fs);
 +    this.security = AuditedSecurityOperation.getInstance(this);
 +
 +    metricsFactory = new TabletServerMetricsFactory(aconf);
 +    updateMetrics = metricsFactory.createUpdateMetrics();
 +    scanMetrics = metricsFactory.createScanMetrics();
 +    mincMetrics = metricsFactory.createMincMetrics();
 +    SimpleTimer.getInstance(aconf).schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        TabletLocator.clearLocators();
        }
 +    }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), 
jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
  
 -      if (r == null)
 -        throw new TimeoutException();
 -
 -      // make this method stop working now that something is being
 -      // returned
 -      resultQueue = null;
 -
 -      if (r instanceof Throwable)
 -        throw new ExecutionException((Throwable) r);
 -
 -      return (T) r;
 -    }
 -
 -    @Override
 -    public boolean isCancelled() {
 -      return state.get() == CANCELED;
 -    }
 -
 -    @Override
 -    public boolean isDone() {
 -      return runState.get().equals(ScanRunState.FINISHED);
 -    }
 -
 -    public ScanRunState getScanRunState() {
 -      return runState.get();
 -    }
 -
 -  }
 -
 -  private static class ConditionalSession extends Session {
 -    public TCredentials credentials;
 -    public Authorizations auths;
 -    public String tableId;
 -    public AtomicBoolean interruptFlag;
 -
 -    @Override
 -    public void cleanup() {
 -      interruptFlag.set(true);
 -    }
 -  }
 -
 -  private static class UpdateSession extends Session {
 -    public Tablet currentTablet;
 -    public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>();
 -    Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>();
 -    HashMap<KeyExtent,SecurityErrorCode> authFailures = new 
HashMap<KeyExtent,SecurityErrorCode>();
 -    public Violations violations;
 -    public TCredentials credentials;
 -    public long totalUpdates = 0;
 -    public long flushTime = 0;
 -    Stat prepareTimes = new Stat();
 -    Stat walogTimes = new Stat();
 -    Stat commitTimes = new Stat();
 -    Stat authTimes = new Stat();
 -    public Map<Tablet,List<Mutation>> queuedMutations = new 
HashMap<Tablet,List<Mutation>>();
 -    public long queuedMutationSize = 0;
 -    TservConstraintEnv cenv = null;
 -  }
 -
 -  private static class ScanSession extends Session {
 -    public KeyExtent extent;
 -    public HashSet<Column> columnSet;
 -    public List<IterInfo> ssiList;
 -    public Map<String,Map<String,String>> ssio;
 -    public Authorizations auths;
 -    public long entriesReturned = 0;
 -    public Stat nbTimes = new Stat();
 -    public long batchCount = 0;
 -    public volatile ScanTask<ScanBatch> nextBatchTask;
 -    public AtomicBoolean interruptFlag;
 -    public Scanner scanner;
 -    public long readaheadThreshold = 
Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
 -
 -    @Override
 -    public void cleanup() {
 -      try {
 -        if (nextBatchTask != null)
 -          nextBatchTask.cancel(true);
 -      } finally {
 -        if (scanner != null)
 -          scanner.close();
 -      }
 +    // Create the secret manager
 +    setSecretManager(new AuthenticationTokenSecretManager(instance, 
aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME)));
 +    if (aconf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
 +      log.info("SASL is enabled, creating ZooKeeper watcher for 
AuthenticationKeys");
 +      // Watcher to notice new AuthenticationKeys which enable delegation 
tokens
 +      authKeyWatcher = new ZooAuthenticationKeyWatcher(getSecretManager(), 
ZooReaderWriter.getInstance(), ZooUtil.getRoot(instance)
 +          + Constants.ZDELEGATION_TOKEN_KEYS);
 +    } else {
 +      authKeyWatcher = null;
      }
 -
    }
  
 -  private static class MultiScanSession extends Session {
 -    HashSet<Column> columnSet;
 -    Map<KeyExtent,List<Range>> queries;
 -    public List<IterInfo> ssiList;
 -    public Map<String,Map<String,String>> ssio;
 -    public Authorizations auths;
 -
 -    // stats
 -    int numRanges;
 -    int numTablets;
 -    int numEntries;
 -    long totalLookupTime;
 -
 -    public volatile ScanTask<MultiScanResult> lookupTask;
 -    public KeyExtent threadPoolExtent;
 -
 -    @Override
 -    public void cleanup() {
 -      if (lookupTask != null)
 -        lookupTask.cancel(true);
 -    }
 +  private static long jitter(long ms) {
 +    Random r = new Random();
 +    // add a random 10% wait
 +    return (long) ((1. + (r.nextDouble() / 10)) * ms);
    }
  
 -  /**
 -   * This little class keeps track of writes in progress and allows readers 
to wait for writes that started before the read. It assumes that the operation 
ids
 -   * are monotonically increasing.
 -   *
 -   */
 -  static class WriteTracker {
 -    private static AtomicLong operationCounter = new AtomicLong(1);
 -    private Map<TabletType,TreeSet<Long>> inProgressWrites = new 
EnumMap<TabletType,TreeSet<Long>>(TabletType.class);
 -
 -    WriteTracker() {
 -      for (TabletType ttype : TabletType.values()) {
 -        inProgressWrites.put(ttype, new TreeSet<Long>());
 -      }
 -    }
 +  private final SessionManager sessionManager;
  
 -    synchronized long startWrite(TabletType ttype) {
 -      long operationId = operationCounter.getAndIncrement();
 -      inProgressWrites.get(ttype).add(operationId);
 -      return operationId;
 -    }
 -
 -    synchronized void finishWrite(long operationId) {
 -      if (operationId == -1)
 -        return;
 +  private final WriteTracker writeTracker = new WriteTracker();
  
 -      boolean removed = false;
 +  private final RowLocks rowLocks = new RowLocks();
  
 -      for (TabletType ttype : TabletType.values()) {
 -        removed = inProgressWrites.get(ttype).remove(operationId);
 -        if (removed)
 -          break;
 -      }
 -
 -      if (!removed) {
 -        throw new IllegalArgumentException("Attempted to finish write not in 
progress,  operationId " + operationId);
 -      }
 -
 -      this.notifyAll();
 -    }
 -
 -    synchronized void waitForWrites(TabletType ttype) {
 -      long operationId = operationCounter.getAndIncrement();
 -      while (inProgressWrites.get(ttype).floor(operationId) != null) {
 -        try {
 -          this.wait();
 -        } catch (InterruptedException e) {
 -          log.error(e, e);
 -        }
 -      }
 -    }
 -
 -    public long startWrite(Set<Tablet> keySet) {
 -      if (keySet.size() == 0)
 -        return -1;
 -
 -      ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size());
 -
 -      for (Tablet tablet : keySet)
 -        extents.add(tablet.getExtent());
 -
 -      return startWrite(TabletType.type(extents));
 -    }
 -  }
 -
 -  public AccumuloConfiguration getSystemConfiguration() {
 -    return serverConfig.getConfiguration();
 -  }
 -
 -  TransactionWatcher watcher = new TransactionWatcher();
 +  private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
 +  private final ReentrantLock recoveryLock = new ReentrantLock(true);
  
    private class ThriftClientHandler extends ClientServiceHandler implements 
TabletClientService.Iface {
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
index 0a8a9e3,0000000..4f38655
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
@@@ -1,432 -1,0 +1,435 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.tablet;
 +
 +import java.io.IOException;
 +import java.text.DateFormat;
 +import java.text.SimpleDateFormat;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Date;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.FileSKVWriter;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 +import org.apache.accumulo.core.iterators.system.DeletingIterator;
 +import org.apache.accumulo.core.iterators.system.MultiIterator;
 +import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.trace.Span;
 +import org.apache.accumulo.core.trace.Trace;
 +import org.apache.accumulo.core.util.LocalityGroupUtil;
 +import 
org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 +import org.apache.accumulo.server.AccumuloServerContext;
 +import org.apache.accumulo.server.fs.FileRef;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.problems.ProblemReport;
 +import org.apache.accumulo.server.problems.ProblemReportingIterator;
 +import org.apache.accumulo.server.problems.ProblemReports;
 +import org.apache.accumulo.server.problems.ProblemType;
 +import org.apache.accumulo.tserver.InMemoryMap;
 +import org.apache.accumulo.tserver.MinorCompactionReason;
 +import org.apache.accumulo.tserver.TabletIteratorEnvironment;
 +import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 +import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class Compactor implements Callable<CompactionStats> {
 +  private static final Logger log = LoggerFactory.getLogger(Compactor.class);
 +  private static final AtomicLong nextCompactorID = new AtomicLong(0);
 +
 +  public static class CompactionCanceledException extends Exception {
 +    private static final long serialVersionUID = 1L;
 +  }
 +
 +  public interface CompactionEnv {
 +
 +    boolean isCompactionEnabled();
 +
 +    IteratorScope getIteratorScope();
 +  }
 +
 +  private final Map<FileRef,DataFileValue> filesToCompact;
 +  private final InMemoryMap imm;
 +  private final FileRef outputFile;
 +  private final boolean propogateDeletes;
 +  private final AccumuloConfiguration acuTableConf;
 +  private final CompactionEnv env;
 +  private final VolumeManager fs;
 +  protected final KeyExtent extent;
 +  private final List<IteratorSetting> iterators;
 +
 +  // things to report
 +  private String currentLocalityGroup = "";
 +  private final long startTime;
 +
 +  private int reason;
 +
 +  private final AtomicLong entriesRead = new AtomicLong(0);
 +  private final AtomicLong entriesWritten = new AtomicLong(0);
 +  private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd 
HH:mm:ss.SSS");
 +
 +  // a unique id to identify a compactor
 +  private final long compactorID = nextCompactorID.getAndIncrement();
 +  protected volatile Thread thread;
 +  private final AccumuloServerContext context;
 +
 +  public long getCompactorID() {
 +    return compactorID;
 +  }
 +
 +  private synchronized void setLocalityGroup(String name) {
 +    this.currentLocalityGroup = name;
 +  }
 +
 +  public synchronized String getCurrentLocalityGroup() {
 +    return currentLocalityGroup;
 +  }
 +
 +  private void clearStats() {
 +    entriesRead.set(0);
 +    entriesWritten.set(0);
 +  }
 +
 +  protected static final Set<Compactor> runningCompactions = 
Collections.synchronizedSet(new HashSet<Compactor>());
 +
 +  public static List<CompactionInfo> getRunningCompactions() {
 +    ArrayList<CompactionInfo> compactions = new ArrayList<CompactionInfo>();
 +
 +    synchronized (runningCompactions) {
 +      for (Compactor compactor : runningCompactions) {
 +        compactions.add(new CompactionInfo(compactor));
 +      }
 +    }
 +
 +    return compactions;
 +  }
 +
 +  public Compactor(AccumuloServerContext context, Tablet tablet, 
Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile,
 +      boolean propogateDeletes, CompactionEnv env, List<IteratorSetting> 
iterators, int reason, AccumuloConfiguration tableConfiguation) {
 +    this.context = context;
 +    this.extent = tablet.getExtent();
 +    this.fs = tablet.getTabletServer().getFileSystem();
 +    this.acuTableConf = tableConfiguation;
 +    this.filesToCompact = files;
 +    this.imm = imm;
 +    this.outputFile = outputFile;
 +    this.propogateDeletes = propogateDeletes;
 +    this.env = env;
 +    this.iterators = iterators;
 +    this.reason = reason;
 +
 +    startTime = System.currentTimeMillis();
 +  }
 +
 +  public VolumeManager getFileSystem() {
 +    return fs;
 +  }
 +
 +  KeyExtent getExtent() {
 +    return extent;
 +  }
 +
 +  String getOutputFile() {
 +    return outputFile.toString();
 +  }
 +
 +  MajorCompactionReason getMajorCompactionReason() {
 +    return MajorCompactionReason.values()[reason];
 +  }
 +
 +  @Override
 +  public CompactionStats call() throws IOException, 
CompactionCanceledException {
 +
 +    FileSKVWriter mfw = null;
 +
 +    CompactionStats majCStats = new CompactionStats();
 +
 +    boolean remove = runningCompactions.add(this);
 +
 +    clearStats();
 +
++    final Path outputFilePath = outputFile.path();
++    final String outputFilePathName = outputFilePath.toString();
 +    String oldThreadName = Thread.currentThread().getName();
 +    String newThreadName = "MajC compacting " + extent.toString() + " started 
" + dateFormatter.format(new Date()) + " file: " + outputFile;
 +    Thread.currentThread().setName(newThreadName);
 +    thread = Thread.currentThread();
 +    try {
 +      FileOperations fileFactory = FileOperations.getInstance();
-       FileSystem ns = 
this.fs.getVolumeByPath(outputFile.path()).getFileSystem();
-       mfw = fileFactory.openWriter(outputFile.path().toString(), ns, 
ns.getConf(), acuTableConf);
++      FileSystem ns = this.fs.getVolumeByPath(outputFilePath).getFileSystem();
++      mfw = fileFactory.openWriter(outputFilePathName, ns, ns.getConf(), 
acuTableConf);
 +
 +      Map<String,Set<ByteSequence>> lGroups;
 +      try {
 +        lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
 +      } catch (LocalityGroupConfigurationError e) {
 +        throw new IOException(e);
 +      }
 +
 +      long t1 = System.currentTimeMillis();
 +
 +      HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>();
 +
 +      if (mfw.supportsLocalityGroups()) {
 +        for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
 +          setLocalityGroup(entry.getKey());
 +          compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, 
majCStats);
 +          allColumnFamilies.addAll(entry.getValue());
 +        }
 +      }
 +
 +      setLocalityGroup("");
 +      compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
 +
 +      long t2 = System.currentTimeMillis();
 +
 +      FileSKVWriter mfwTmp = mfw;
 +      mfw = null; // set this to null so we do not try to close it again in 
finally if the close fails
 +      mfwTmp.close(); // if the close fails it will cause the compaction to 
fail
 +
 +      // Verify the file, since hadoop 0.20.2 sometimes lies about the 
success of close()
 +      try {
-         FileSKVIterator openReader = 
fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), 
acuTableConf);
++        FileSKVIterator openReader = 
fileFactory.openReader(outputFilePathName, false, ns, ns.getConf(), 
acuTableConf);
 +        openReader.close();
 +      } catch (IOException ex) {
 +        log.error("Verification of successful compaction fails!!! " + extent 
+ " " + outputFile, ex);
 +        throw ex;
 +      }
 +
 +      log.debug(String.format("Compaction %s %,d read | %,d written | %,6d 
entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
 +          majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / 
((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
 +
-       
majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, 
ns.getConf(), acuTableConf));
++      majCStats.setFileSize(fileFactory.getFileSize(outputFilePathName, ns, 
ns.getConf(), acuTableConf));
 +      return majCStats;
 +    } catch (IOException e) {
 +      log.error("{}", e.getMessage(), e);
 +      throw e;
 +    } catch (RuntimeException e) {
 +      log.error("{}", e.getMessage(), e);
 +      throw e;
 +    } finally {
 +      Thread.currentThread().setName(oldThreadName);
 +      if (remove) {
 +        thread = null;
 +        runningCompactions.remove(this);
 +      }
 +
 +      try {
 +        if (mfw != null) {
 +          // compaction must not have finished successfully, so close its 
output file
 +          try {
 +            mfw.close();
 +          } finally {
 +            if (!fs.deleteRecursively(outputFile.path()))
 +              if (fs.exists(outputFile.path()))
 +                log.error("Unable to delete " + outputFile);
 +          }
 +        }
 +      } catch (IOException e) {
 +        log.warn("{}", e.getMessage(), e);
 +      } catch (RuntimeException exception) {
 +        log.warn("{}", exception.getMessage(), exception);
 +      }
 +    }
 +  }
 +
 +  private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String 
lgName, ArrayList<FileSKVIterator> readers) throws IOException {
 +
 +    List<SortedKeyValueIterator<Key,Value>> iters = new 
ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
 +
 +    for (FileRef mapFile : filesToCompact.keySet()) {
 +      try {
 +
 +        FileOperations fileFactory = FileOperations.getInstance();
 +        FileSystem fs = 
this.fs.getVolumeByPath(mapFile.path()).getFileSystem();
 +        FileSKVIterator reader;
 +
 +        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, 
fs.getConf(), acuTableConf);
 +
 +        readers.add(reader);
 +
 +        SortedKeyValueIterator<Key,Value> iter = new 
ProblemReportingIterator(context, extent.getTableId().toString(), 
mapFile.path().toString(), false, reader);
 +
 +        if (filesToCompact.get(mapFile).isTimeSet()) {
 +          iter = new TimeSettingIterator(iter, 
filesToCompact.get(mapFile).getTime());
 +        }
 +
 +        iters.add(iter);
 +
 +      } catch (Throwable e) {
 +
 +        ProblemReports.getInstance(context).report(new 
ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, 
mapFile.path().toString(), e));
 +
 +        log.warn("Some problem opening map file {} {}", mapFile, 
e.getMessage(), e);
 +        // failed to open some map file... close the ones that were opened
 +        for (FileSKVIterator reader : readers) {
 +          try {
 +            reader.close();
 +          } catch (Throwable e2) {
 +            log.warn("Failed to close map file", e2);
 +          }
 +        }
 +
 +        readers.clear();
 +
 +        if (e instanceof IOException)
 +          throw (IOException) e;
 +        throw new IOException("Failed to open map data files", e);
 +      }
 +    }
 +
 +    return iters;
 +  }
 +
 +  private void compactLocalityGroup(String lgName, Set<ByteSequence> 
columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
 +      throws IOException, CompactionCanceledException {
 +    ArrayList<FileSKVIterator> readers = new 
ArrayList<FileSKVIterator>(filesToCompact.size());
 +    Span span = Trace.start("compact");
 +    try {
 +      long entriesCompacted = 0;
 +      List<SortedKeyValueIterator<Key,Value>> iters = 
openMapDataFiles(lgName, readers);
 +
 +      if (imm != null) {
 +        iters.add(imm.compactionIterator());
 +      }
 +
 +      CountingIterator citr = new CountingIterator(new MultiIterator(iters, 
extent.toDataRange()), entriesRead);
 +      DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes);
 +      ColumnFamilySkippingIterator cfsi = new 
ColumnFamilySkippingIterator(delIter);
 +
 +      // if(env.getIteratorScope() )
 +
 +      TabletIteratorEnvironment iterEnv;
 +      if (env.getIteratorScope() == IteratorScope.majc)
 +        iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, 
!propogateDeletes, acuTableConf);
 +      else if (env.getIteratorScope() == IteratorScope.minc)
 +        iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, 
acuTableConf);
 +      else
 +        throw new IllegalArgumentException();
 +
 +      SortedKeyValueIterator<Key,Value> itr = 
iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), 
cfsi, extent, acuTableConf,
 +          iterators, iterEnv));
 +
 +      itr.seek(extent.toDataRange(), columnFamilies, inclusive);
 +
 +      if (!inclusive) {
 +        mfw.startDefaultLocalityGroup();
 +      } else {
 +        mfw.startNewLocalityGroup(lgName, columnFamilies);
 +      }
 +
 +      Span write = Trace.start("write");
 +      try {
 +        while (itr.hasTop() && env.isCompactionEnabled()) {
 +          mfw.append(itr.getTopKey(), itr.getTopValue());
 +          itr.next();
 +          entriesCompacted++;
 +
 +          if (entriesCompacted % 1024 == 0) {
 +            // Periodically update stats, do not want to do this too often 
since its volatile
 +            entriesWritten.addAndGet(1024);
 +          }
 +        }
 +
 +        if (itr.hasTop() && !env.isCompactionEnabled()) {
 +          // cancel major compaction operation
 +          try {
 +            try {
 +              mfw.close();
 +            } catch (IOException e) {
 +              log.error("{}", e.getMessage(), e);
 +            }
 +            fs.deleteRecursively(outputFile.path());
 +          } catch (Exception e) {
 +            log.warn("Failed to delete Canceled compaction output file " + 
outputFile, e);
 +          }
 +          throw new CompactionCanceledException();
 +        }
 +
 +      } finally {
 +        CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), 
entriesCompacted);
 +        majCStats.add(lgMajcStats);
 +        write.stop();
 +      }
 +
 +    } finally {
 +      // close sequence files opened
 +      for (FileSKVIterator reader : readers) {
 +        try {
 +          reader.close();
 +        } catch (Throwable e) {
 +          log.warn("Failed to close map file", e);
 +        }
 +      }
 +      span.stop();
 +    }
 +  }
 +
 +  Collection<FileRef> getFilesToCompact() {
 +    return filesToCompact.keySet();
 +  }
 +
 +  boolean hasIMM() {
 +    return imm != null;
 +  }
 +
 +  boolean willPropogateDeletes() {
 +    return propogateDeletes;
 +  }
 +
 +  long getEntriesRead() {
 +    return entriesRead.get();
 +  }
 +
 +  long getEntriesWritten() {
 +    return entriesWritten.get();
 +  }
 +
 +  long getStartTime() {
 +    return startTime;
 +  }
 +
 +  Iterable<IteratorSetting> getIterators() {
 +    return this.iterators;
 +  }
 +
 +  MinorCompactionReason getMinCReason() {
 +    return MinorCompactionReason.values()[reason];
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index 490ecd3,0000000..2aa772f
mode 100644,000000..100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@@ -1,147 -1,0 +1,148 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.tablet;
 +
 +import java.io.IOException;
 +import java.util.Collections;
 +import java.util.Map;
 +import java.util.Random;
 +
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.conf.TableConfiguration;
 +import org.apache.accumulo.server.fs.FileRef;
 +import org.apache.accumulo.server.problems.ProblemReport;
 +import org.apache.accumulo.server.problems.ProblemReports;
 +import org.apache.accumulo.server.problems.ProblemType;
 +import org.apache.accumulo.tserver.InMemoryMap;
 +import org.apache.accumulo.tserver.MinorCompactionReason;
 +import org.apache.accumulo.tserver.TabletServer;
 +import org.apache.hadoop.fs.Path;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class MinorCompactor extends Compactor {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(MinorCompactor.class);
 +
 +  private static final Map<FileRef,DataFileValue> EMPTY_MAP = 
Collections.emptyMap();
 +
 +  private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, 
DataFileValue dfv) {
 +    if (mergeFile == null)
 +      return EMPTY_MAP;
 +
 +    return Collections.singletonMap(mergeFile, dfv);
 +  }
 +
 +  private final TabletServer tabletServer;
 +
 +  public MinorCompactor(TabletServer tabletServer, Tablet tablet, InMemoryMap 
imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile,
 +      MinorCompactionReason mincReason, TableConfiguration tableConfig) {
 +    super(tabletServer, tablet, toFileMap(mergeFile, dfv), imm, outputFile, 
true, new CompactionEnv() {
 +
 +      @Override
 +      public boolean isCompactionEnabled() {
 +        return true;
 +      }
 +
 +      @Override
 +      public IteratorScope getIteratorScope() {
 +        return IteratorScope.minc;
 +      }
 +    }, Collections.<IteratorSetting> emptyList(), mincReason.ordinal(), 
tableConfig);
 +    this.tabletServer = tabletServer;
 +  }
 +
 +  private boolean isTableDeleting() {
 +    try {
 +      return Tables.getTableState(tabletServer.getInstance(), 
extent.getTableId().toString()) == TableState.DELETING;
 +    } catch (Exception e) {
 +      log.warn("Failed to determine if table " + extent.getTableId() + " was 
deleting ", e);
 +      return false; // can not get positive confirmation that its deleting.
 +    }
 +  }
 +
 +  @Override
 +  public CompactionStats call() {
-     log.debug("Begin minor compaction " + getOutputFile() + " " + 
getExtent());
++    final String outputFileName = getOutputFile();
++    log.debug("Begin minor compaction " + outputFileName + " " + getExtent());
 +
 +    // output to new MapFile with a temporary name
 +    int sleepTime = 100;
 +    double growthFactor = 4;
 +    int maxSleepTime = 1000 * 60 * 3; // 3 minutes
 +    boolean reportedProblem = false;
 +
 +    runningCompactions.add(this);
 +    try {
 +      do {
 +        try {
 +          CompactionStats ret = super.call();
 +
 +          // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d 
recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted,
 +          // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, 
estimatedSizeInBytes()));
 +
 +          if (reportedProblem) {
-             
ProblemReports.getInstance(tabletServer).deleteProblemReport(getExtent().getTableId().toString(),
 ProblemType.FILE_WRITE, getOutputFile());
++            
ProblemReports.getInstance(tabletServer).deleteProblemReport(getExtent().getTableId().toString(),
 ProblemType.FILE_WRITE, outputFileName);
 +          }
 +
 +          return ret;
 +        } catch (IOException e) {
-           log.warn("MinC failed ({}) to create {} retrying ...", 
e.getMessage(), getOutputFile());
-           ProblemReports.getInstance(tabletServer).report(new 
ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, 
getOutputFile(), e));
++          log.warn("MinC failed ({}) to create {} retrying ...", 
e.getMessage(), outputFileName);
++          ProblemReports.getInstance(tabletServer).report(new 
ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, 
outputFileName, e));
 +          reportedProblem = true;
 +        } catch (RuntimeException e) {
 +          // if this is coming from a user iterator, it is possible that the 
user could change the iterator config and that the
 +          // minor compaction would succeed
-           log.warn("MinC failed ({}) to create {} retrying ...", 
e.getMessage(), getOutputFile(), e);
-           ProblemReports.getInstance(tabletServer).report(new 
ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, 
getOutputFile(), e));
++          log.warn("MinC failed ({}) to create {} retrying ...", 
e.getMessage(), outputFileName, e);
++          ProblemReports.getInstance(tabletServer).report(new 
ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, 
outputFileName, e));
 +          reportedProblem = true;
 +        } catch (CompactionCanceledException e) {
 +          throw new IllegalStateException(e);
 +        }
 +
 +        Random random = new Random();
 +
 +        int sleep = sleepTime + random.nextInt(sleepTime);
 +        log.debug("MinC failed sleeping " + sleep + " ms before retrying");
 +        UtilWaitThread.sleep(sleep);
 +        sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * 
growthFactor));
 +
 +        // clean up
 +        try {
-           if (getFileSystem().exists(new Path(getOutputFile()))) {
-             getFileSystem().deleteRecursively(new Path(getOutputFile()));
++          if (getFileSystem().exists(new Path(outputFileName))) {
++            getFileSystem().deleteRecursively(new Path(outputFileName));
 +          }
 +        } catch (IOException e) {
-           log.warn("Failed to delete failed MinC file {} {}", 
getOutputFile(), e.getMessage());
++          log.warn("Failed to delete failed MinC file {} {}", outputFileName, 
e.getMessage());
 +        }
 +
 +        if (isTableDeleting())
 +          return new CompactionStats(0, 0);
 +
 +      } while (true);
 +    } finally {
 +      thread = null;
 +      runningCompactions.remove(this);
 +    }
 +  }
 +
 +}

Reply via email to