Merge branch '1.6' into 1.7

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2b286ba5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2b286ba5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2b286ba5

Branch: refs/heads/master
Commit: 2b286ba5660ec78e5f1cb675fde55ccdeba6a327
Parents: cd5eb1f 0206d78
Author: Dave Marion <dlmar...@apache.org>
Authored: Thu Mar 31 16:23:03 2016 -0400
Committer: Dave Marion <dlmar...@apache.org>
Committed: Thu Mar 31 16:23:03 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/core/conf/Property.java     |  3 +--
 .../java/org/apache/accumulo/tserver/TabletServer.java   |  3 ++-
 .../apache/accumulo/tserver/log/TabletServerLogger.java  | 11 ++++++++---
 3 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b286ba5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 28cc861,2149ad9..dbb2036
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -252,16 -209,7 +252,15 @@@ public enum Property 
            + "must be made, which is slower. However opening too many files at 
once can cause problems."),
    TSERV_WALOG_MAX_SIZE("tserver.walog.max.size", "1G", PropertyType.MEMORY,
        "The maximum size for each write-ahead log. See comment for property 
tserver.memory.maps.max"),
- 
+   TSERV_WALOG_MAX_AGE("tserver.walog.max.age", "24h", 
PropertyType.TIMEDURATION, "The maximum age for each write-ahead log."),
 +  
TSERV_WALOG_TOLERATED_CREATION_FAILURES("tserver.walog.tolerated.creation.failures",
 "50", PropertyType.COUNT,
 +      "The maximum number of failures tolerated when creating a new WAL file 
within the period specified by tserver.walog.failures.period."
 +          + " Exceeding this number of failures in the period causes the 
TabletServer to exit."),
 +  
TSERV_WALOG_TOLERATED_WAIT_INCREMENT("tserver.walog.tolerated.wait.increment", 
"1000ms", PropertyType.TIMEDURATION,
 +      "The amount of time to wait between failures to create a WALog."),
 +  // Never wait longer than 5 mins for a retry
 +  
TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION("tserver.walog.maximum.wait.duration",
 "5m", PropertyType.TIMEDURATION,
 +      "The maximum amount of time to wait after a failure to create a WAL 
file."),
- 
    TSERV_MAJC_DELAY("tserver.compaction.major.delay", "30s", 
PropertyType.TIMEDURATION,
        "Time a tablet server will sleep between checking which tablets need 
compaction."),
    TSERV_MAJC_THREAD_MAXOPEN("tserver.compaction.major.thread.files.open.max", 
"10", PropertyType.COUNT,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b286ba5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index eabf51d,38bd8ac..a3b224f
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -254,132 -257,634 +254,133 @@@ public class TabletServer extends Accum
    private static final long TIME_BETWEEN_GC_CHECKS = 5000;
    private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 
1000;
  
 -  private TabletServerLogger logger;
 -
 -  protected TabletServerMinCMetrics mincMetrics = new 
TabletServerMinCMetrics();
 +  private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
 +  private final TransactionWatcher watcher = new TransactionWatcher();
 +  private final ZooCache masterLockCache = new ZooCache();
  
 -  private ServerConfiguration serverConfig;
 -  private LogSorter logSorter = null;
 +  private final TabletServerLogger logger;
  
 -  public TabletServer(ServerConfiguration conf, VolumeManager fs) {
 -    super();
 -    this.serverConfig = conf;
 -    this.instance = conf.getInstance();
 -    this.fs = fs;
 -
 -    log.info("Version " + Constants.VERSION);
 -    log.info("Instance " + instance.getInstanceID());
 -
 -    this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
 -    SimpleTimer.getInstance().schedule(new Runnable() {
 -      @Override
 -      public void run() {
 -        synchronized (onlineTablets) {
 -          long now = System.currentTimeMillis();
 -          for (Tablet tablet : onlineTablets.values())
 -            try {
 -              tablet.updateRates(now);
 -            } catch (Exception ex) {
 -              log.error(ex, ex);
 -            }
 -        }
 -      }
 -    }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS);
 -    SimpleTimer.getInstance().schedule(new Runnable() {
 -      @Override
 -      public void run() {
 -        TabletLocator.clearLocators();
 -      }
 -    }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), 
jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
 -  }
 +  private final TabletServerMetricsFactory metricsFactory;
 +  private final Metrics updateMetrics;
 +  private final Metrics scanMetrics;
 +  private final Metrics mincMetrics;
  
 -  private static long jitter(long ms) {
 -    Random r = new Random();
 -    // add a random 10% wait
 -    return (long) ((1. + (r.nextDouble() / 10)) * ms);
 +  public Metrics getMinCMetrics() {
 +    return mincMetrics;
    }
  
 -  private synchronized static void logGCInfo(AccumuloConfiguration conf) {
 -    long now = System.currentTimeMillis();
 -
 -    List<GarbageCollectorMXBean> gcmBeans = 
ManagementFactory.getGarbageCollectorMXBeans();
 -    Runtime rt = Runtime.getRuntime();
 -
 -    StringBuilder sb = new StringBuilder("gc");
 -
 -    boolean sawChange = false;
 -
 -    long maxIncreaseInCollectionTime = 0;
 -
 -    for (GarbageCollectorMXBean gcBean : gcmBeans) {
 -      Long prevTime = prevGcTime.get(gcBean.getName());
 -      long pt = 0;
 -      if (prevTime != null) {
 -        pt = prevTime;
 -      }
 -
 -      long time = gcBean.getCollectionTime();
 -
 -      if (time - pt != 0) {
 -        sawChange = true;
 -      }
 -
 -      long increaseInCollectionTime = time - pt;
 -      sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), 
time / 1000.0, increaseInCollectionTime / 1000.0));
 -      maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, 
maxIncreaseInCollectionTime);
 -      prevGcTime.put(gcBean.getName(), time);
 -    }
 -
 -    long mem = rt.freeMemory();
 -    if (maxIncreaseInCollectionTime == 0) {
 -      gcTimeIncreasedCount = 0;
 -    } else {
 -      gcTimeIncreasedCount++;
 -      if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) {
 -        log.warn("Running low on memory");
 -        gcTimeIncreasedCount = 0;
 -      }
 -    }
 -
 -    if (mem > lastMemorySize) {
 -      sawChange = true;
 -    }
 -
 -    String sign = "+";
 -    if (mem - lastMemorySize <= 0) {
 -      sign = "";
 -    }
 -
 -    sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, 
(mem - lastMemorySize), rt.totalMemory()));
 -
 -    if (sawChange) {
 -      log.debug(sb.toString());
 -    }
 -
 -    final long keepAliveTimeout = 
conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
 -    if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
 -      long diff = now - lastMemoryCheckTime;
 -      if (diff > keepAliveTimeout + 1000) {
 -        log.warn(String.format("GC pause checker not called in a timely 
fashion. Expected every %.1f seconds but was %.1f seconds since last check",
 -            TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.));
 -      }
 -      lastMemoryCheckTime = now;
 -      return;
 -    }
 -
 -    if (maxIncreaseInCollectionTime > keepAliveTimeout) {
 -      Halt.halt("Garbage collection may be interfering with lock keep-alive.  
Halting.", -1);
 -    }
 -
 -    lastMemorySize = mem;
 -    lastMemoryCheckTime = now;
 -  }
 -
 -  private TabletStatsKeeper statsKeeper;
 -
 -  private static class Session {
 -    long lastAccessTime;
 -    long startTime;
 -    String user;
 -    String client = TServerUtils.clientAddress.get();
 -    public boolean reserved;
 -
 -    public boolean cleanup() {
 -      return true;
 -    }
 -  }
 -
 -  private static class SessionManager {
 -
 -    SecureRandom random;
 -    Map<Long,Session> sessions;
 -    private long maxIdle;
 -    private long maxUpdateIdle;
 -    private List<Session> idleSessions = new ArrayList<Session>();
 -    private final Long expiredSessionMarker = Long.valueOf(-1);
 -
 -    SessionManager(AccumuloConfiguration conf) {
 -      random = new SecureRandom();
 -      sessions = new HashMap<Long,Session>();
 -      maxUpdateIdle = 
conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
 -      maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 -
 -      Runnable r = new Runnable() {
 -        @Override
 -        public void run() {
 -          sweep(maxIdle, maxUpdateIdle);
 -        }
 -      };
 -
 -      SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000));
 -    }
 -
 -    synchronized long createSession(Session session, boolean reserve) {
 -      long sid = random.nextLong();
 -
 -      while (sessions.containsKey(sid)) {
 -        sid = random.nextLong();
 -      }
 -
 -      sessions.put(sid, session);
 -
 -      session.reserved = reserve;
 -
 -      session.startTime = session.lastAccessTime = System.currentTimeMillis();
 -
 -      return sid;
 -    }
 -
 -    long getMaxIdleTime() {
 -      return maxIdle;
 -    }
 -
 -    /**
 -     * while a session is reserved, it cannot be canceled or removed
 -     */
 -    synchronized Session reserveSession(long sessionId) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        if (session.reserved)
 -          throw new IllegalStateException();
 -        session.reserved = true;
 -      }
 -
 -      return session;
 -
 -    }
 -
 -    synchronized Session reserveSession(long sessionId, boolean wait) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        while (wait && session.reserved) {
 -          try {
 -            wait(1000);
 -          } catch (InterruptedException e) {
 -            throw new RuntimeException();
 -          }
 -        }
 -
 -        if (session.reserved)
 -          throw new IllegalStateException();
 -        session.reserved = true;
 -      }
 -
 -      return session;
 -
 -    }
 -
 -    synchronized void unreserveSession(Session session) {
 -      if (!session.reserved)
 -        throw new IllegalStateException();
 -      notifyAll();
 -      session.reserved = false;
 -      session.lastAccessTime = System.currentTimeMillis();
 -    }
 -
 -    synchronized void unreserveSession(long sessionId) {
 -      Session session = getSession(sessionId);
 -      if (session != null)
 -        unreserveSession(session);
 -    }
 -
 -    synchronized Session getSession(long sessionId) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null)
 -        session.lastAccessTime = System.currentTimeMillis();
 -      return session;
 -    }
 -
 -    Session removeSession(long sessionId) {
 -      return removeSession(sessionId, false);
 -    }
 -
 -    Session removeSession(long sessionId, boolean unreserve) {
 -      Session session = null;
 -      synchronized (this) {
 -        session = sessions.remove(sessionId);
 -        if (unreserve && session != null)
 -          unreserveSession(session);
 -      }
 -
 -      // do clean up out side of lock..
 -      if (session != null)
 -        session.cleanup();
 -
 -      return session;
 -    }
 -
 -    private void sweep(final long maxIdle, final long maxUpdateIdle) {
 -      ArrayList<Session> sessionsToCleanup = new ArrayList<Session>();
 -      synchronized (this) {
 -        Iterator<Session> iter = sessions.values().iterator();
 -        while (iter.hasNext()) {
 -          Session session = iter.next();
 -          long configuredIdle = maxIdle;
 -          if (session instanceof UpdateSession) {
 -            configuredIdle = maxUpdateIdle;
 -          }
 -          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
 -          if (idleTime > configuredIdle && !session.reserved) {
 -            log.info("Closing idle session from user=" + session.user + ", 
client=" + session.client + ", idle=" + idleTime + "ms");
 -            iter.remove();
 -            sessionsToCleanup.add(session);
 -          }
 -        }
 -      }
 -
 -      // do clean up outside of lock for TabletServer in a synchronized block 
for simplicity vice a synchronized list
 -
 -      synchronized (idleSessions) {
 -
 -        sessionsToCleanup.addAll(idleSessions);
 -
 -        idleSessions.clear();
 -
 -        // perform cleanup for all of the sessions
 -        for (Session session : sessionsToCleanup) {
 -          if (!session.cleanup())
 -            idleSessions.add(session);
 -        }
 -      }
 -
 -    }
 -
 -    synchronized void removeIfNotAccessed(final long sessionId, final long 
delay) {
 -      Session session = sessions.get(sessionId);
 -      if (session != null) {
 -        final long removeTime = session.lastAccessTime;
 -        TimerTask r = new TimerTask() {
 -          @Override
 -          public void run() {
 -            Session sessionToCleanup = null;
 -            synchronized (SessionManager.this) {
 -              Session session2 = sessions.get(sessionId);
 -              if (session2 != null && session2.lastAccessTime == removeTime 
&& !session2.reserved) {
 -                log.info("Closing not accessed session from user=" + 
session2.user + ", client=" + session2.client + ", duration=" + delay + "ms");
 -                sessions.remove(sessionId);
 -                sessionToCleanup = session2;
 -              }
 -            }
 -
 -            // call clean up outside of lock
 -            if (sessionToCleanup != null)
 -              sessionToCleanup.cleanup();
 -          }
 -        };
 -
 -        SimpleTimer.getInstance().schedule(r, delay);
 -      }
 -    }
 -
 -    public synchronized Map<String,MapCounter<ScanRunState>> 
getActiveScansPerTable() {
 -      Map<String,MapCounter<ScanRunState>> counts = new 
HashMap<String,MapCounter<ScanRunState>>();
 -      Set<Entry<Long,Session>> copiedIdleSessions = new 
HashSet<Entry<Long,Session>>();
 -
 -      synchronized (idleSessions) {
 -        /**
 -         * Add sessions so that get the list returned in the active scans call
 -         */
 -        for (Session session : idleSessions) {
 -          copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
 -        }
 -      }
 -
 -      for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), 
copiedIdleSessions)) {
 -
 -        Session session = entry.getValue();
 -        @SuppressWarnings("rawtypes")
 -        ScanTask nbt = null;
 -        String tableID = null;
 -
 -        if (session instanceof ScanSession) {
 -          ScanSession ss = (ScanSession) session;
 -          nbt = ss.nextBatchTask;
 -          tableID = ss.extent.getTableId().toString();
 -        } else if (session instanceof MultiScanSession) {
 -          MultiScanSession mss = (MultiScanSession) session;
 -          nbt = mss.lookupTask;
 -          tableID = mss.threadPoolExtent.getTableId().toString();
 -        }
 -
 -        if (nbt == null)
 -          continue;
 +  private final LogSorter logSorter;
 +  private ReplicationWorker replWorker = null;
 +  private final TabletStatsKeeper statsKeeper;
 +  private final AtomicInteger logIdGenerator = new AtomicInteger();
  
 -        ScanRunState srs = nbt.getScanRunState();
 +  private final AtomicLong flushCounter = new AtomicLong(0);
 +  private final AtomicLong syncCounter = new AtomicLong(0);
  
 -        if (srs == ScanRunState.FINISHED)
 -          continue;
 +  private final VolumeManager fs;
  
 -        MapCounter<ScanRunState> stateCounts = counts.get(tableID);
 -        if (stateCounts == null) {
 -          stateCounts = new MapCounter<ScanRunState>();
 -          counts.put(tableID, stateCounts);
 -        }
 +  private final SortedMap<KeyExtent,Tablet> onlineTablets = 
Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
 +  private final SortedSet<KeyExtent> unopenedTablets = 
Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
 +  private final SortedSet<KeyExtent> openingTablets = 
Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
 +  @SuppressWarnings("unchecked")
 +  private final Map<KeyExtent,Long> recentlyUnloadedCache = 
Collections.synchronizedMap(new LRUMap(1000));
  
 -        stateCounts.increment(srs, 1);
 -      }
 +  private final TabletServerResourceManager resourceManager;
 +  private final SecurityOperation security;
  
 -      return counts;
 -    }
 +  private final BlockingDeque<MasterMessage> masterMessages = new 
LinkedBlockingDeque<MasterMessage>();
  
 -    public synchronized List<ActiveScan> getActiveScans() {
 +  private Thread majorCompactorThread;
  
 -      final List<ActiveScan> activeScans = new ArrayList<ActiveScan>();
 -      final long ct = System.currentTimeMillis();
 -      final Set<Entry<Long,Session>> copiedIdleSessions = new 
HashSet<Entry<Long,Session>>();
 +  private HostAndPort replicationAddress;
 +  private HostAndPort clientAddress;
  
 -      synchronized (idleSessions) {
 -        /**
 -         * Add sessions so that get the list returned in the active scans call
 -         */
 -        for (Session session : idleSessions) {
 -          copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
 -        }
 -      }
 +  private volatile boolean serverStopRequested = false;
 +  private volatile boolean majorCompactorDisabled = false;
 +  private volatile boolean shutdownComplete = false;
  
 -      for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), 
copiedIdleSessions)) {
 -        Session session = entry.getValue();
 -        if (session instanceof ScanSession) {
 -          ScanSession ss = (ScanSession) session;
 +  private ZooLock tabletServerLock;
  
 -          ScanState state = ScanState.RUNNING;
 +  private TServer server;
 +  private TServer replServer;
  
 -          ScanTask<ScanBatch> nbt = ss.nextBatchTask;
 -          if (nbt == null) {
 -            state = ScanState.IDLE;
 -          } else {
 -            switch (nbt.getScanRunState()) {
 -              case QUEUED:
 -                state = ScanState.QUEUED;
 -                break;
 -              case FINISHED:
 -                state = ScanState.IDLE;
 -                break;
 -              case RUNNING:
 -              default:
 -                /* do nothing */
 -                break;
 -            }
 -          }
 +  private DistributedWorkQueue bulkFailedCopyQ;
  
 -          ActiveScan activeScan = new ActiveScan(ss.client, ss.user, 
ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime,
 -              ScanType.SINGLE, state, ss.extent.toThrift(), 
Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio,
 -              ss.auths.getAuthorizationsBB());
 +  private String lockID;
  
 -          // scanId added by ACCUMULO-2641 is an optional thrift argument and 
not available in ActiveScan constructor
 -          activeScan.setScanId(entry.getKey());
 -          activeScans.add(activeScan);
 +  public static final AtomicLong seekCount = new AtomicLong(0);
  
 -        } else if (session instanceof MultiScanSession) {
 -          MultiScanSession mss = (MultiScanSession) session;
 +  private final AtomicLong totalMinorCompactions = new AtomicLong(0);
 +  private final ServerConfigurationFactory confFactory;
  
 -          ScanState state = ScanState.RUNNING;
 +  private final ZooAuthenticationKeyWatcher authKeyWatcher;
  
 -          ScanTask<MultiScanResult> nbt = mss.lookupTask;
 -          if (nbt == null) {
 -            state = ScanState.IDLE;
 -          } else {
 -            switch (nbt.getScanRunState()) {
 -              case QUEUED:
 -                state = ScanState.QUEUED;
 -                break;
 -              case FINISHED:
 -                state = ScanState.IDLE;
 -                break;
 -              case RUNNING:
 -              default:
 -                /* do nothing */
 -                break;
 +  public TabletServer(ServerConfigurationFactory confFactory, VolumeManager 
fs) {
 +    super(confFactory);
 +    this.confFactory = confFactory;
 +    this.fs = fs;
 +    AccumuloConfiguration aconf = getConfiguration();
 +    Instance instance = getInstance();
 +    log.info("Version " + Constants.VERSION);
 +    log.info("Instance " + instance.getInstanceID());
 +    this.sessionManager = new SessionManager(aconf);
 +    this.logSorter = new LogSorter(instance, fs, aconf);
 +    this.replWorker = new ReplicationWorker(this, fs);
 +    this.statsKeeper = new TabletStatsKeeper();
 +    SimpleTimer.getInstance(aconf).schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        synchronized (onlineTablets) {
 +          long now = System.currentTimeMillis();
 +          for (Tablet tablet : onlineTablets.values())
 +            try {
 +              tablet.updateRates(now);
 +            } catch (Exception ex) {
 +              log.error("Error updating rates for {}", tablet.getExtent(), 
ex);
              }
 -          }
 -
 -          activeScans.add(new ActiveScan(mss.client, mss.user, 
mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - 
mss.lastAccessTime,
 -              ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), 
Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, 
mss.auths
 -                  .getAuthorizationsBB()));
          }
        }
 +    }, 5000, 5000);
  
 -      return activeScans;
 -    }
 -  }
 -
 -  static class TservConstraintEnv implements Environment {
 -
 -    private TCredentials credentials;
 -    private SecurityOperation security;
 -    private Authorizations auths;
 -    private KeyExtent ke;
 -
 -    TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) {
 -      this.security = secOp;
 -      this.credentials = credentials;
 -    }
 -
 -    void setExtent(KeyExtent ke) {
 -      this.ke = ke;
 -    }
 -
 -    @Override
 -    public KeyExtent getExtent() {
 -      return ke;
 -    }
 -
 -    @Override
 -    public String getUser() {
 -      return credentials.getPrincipal();
 -    }
 -
 -    @Override
 -    @Deprecated
 -    public Authorizations getAuthorizations() {
 -      if (auths == null)
 -        try {
 -          this.auths = security.getUserAuthorizations(credentials);
 -        } catch (ThriftSecurityException e) {
 -          throw new RuntimeException(e);
 -        }
 -      return auths;
 -    }
 -
 -    @Override
 -    public AuthorizationContainer getAuthorizationsContainer() {
 -      return new AuthorizationContainer() {
 -        @Override
 -        public boolean contains(ByteSequence auth) {
 -          try {
 -            return security.userHasAuthorizations(credentials,
 -                Collections.<ByteBuffer> 
singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), 
auth.length())));
 -          } catch (ThriftSecurityException e) {
 -            throw new RuntimeException(e);
 -          }
 -        }
 -      };
 -    }
 -  }
 -
 -  private abstract class ScanTask<T> implements RunnableFuture<T> {
 -
 -    protected AtomicBoolean interruptFlag;
 -    protected ArrayBlockingQueue<Object> resultQueue;
 -    protected AtomicInteger state;
 -    protected AtomicReference<ScanRunState> runState;
 -
 -    private static final int INITIAL = 1;
 -    private static final int ADDED = 2;
 -    private static final int CANCELED = 3;
 -
 -    ScanTask() {
 -      interruptFlag = new AtomicBoolean(false);
 -      runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED);
 -      state = new AtomicInteger(INITIAL);
 -      resultQueue = new ArrayBlockingQueue<Object>(1);
 -    }
 -
 -    protected void addResult(Object o) {
 -      if (state.compareAndSet(INITIAL, ADDED))
 -        resultQueue.add(o);
 -      else if (state.get() == ADDED)
 -        throw new IllegalStateException("Tried to add more than one result");
 -    }
 -
 -    @Override
 -    public boolean cancel(boolean mayInterruptIfRunning) {
 -      if (!mayInterruptIfRunning)
 -        throw new IllegalArgumentException("Cancel will always attempt to 
interupt running next batch task");
 -
 -      if (state.get() == CANCELED)
 -        return true;
 -
 -      if (state.compareAndSet(INITIAL, CANCELED)) {
 -        interruptFlag.set(true);
 -        resultQueue = null;
 -        return true;
 -      }
 -
 -      return false;
 -    }
 -
 -    @Override
 -    public T get() throws InterruptedException, ExecutionException {
 -      throw new UnsupportedOperationException();
 -    }
 -
 -    @SuppressWarnings("unchecked")
 -    @Override
 -    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
 -
 -      ArrayBlockingQueue<Object> localRQ = resultQueue;
 -
 -      if (state.get() == CANCELED)
 -        throw new CancellationException();
 -
 -      if (localRQ == null) {
 -        int st = state.get();
 -        String stateStr;
 -        switch (st) {
 -          case ADDED:
 -            stateStr = "ADDED";
 -            break;
 -          case CANCELED:
 -            stateStr = "CANCELED";
 -            break;
 -          case INITIAL:
 -            stateStr = "INITIAL";
 -            break;
 -          default:
 -            stateStr = "UNKNOWN";
 -            break;
 -        }
 -        throw new IllegalStateException("Tried to get result twice [state=" + 
stateStr + "(" + st + ")]");
 -      }
 -
 -      Object r = localRQ.poll(timeout, unit);
 -
 -      // could have been canceled while waiting
 -      if (state.get() == CANCELED) {
 -        if (r != null)
 -          throw new IllegalStateException("Nothing should have been added 
when in canceled state");
 +    final long walogMaxSize = 
aconf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
++    final long walogMaxAge = 
aconf.getTimeInMillis(Property.TSERV_WALOG_MAX_AGE);
 +    final long minBlockSize = 
CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size",
 0);
 +    if (minBlockSize != 0 && minBlockSize > walogMaxSize)
 +      throw new RuntimeException("Unable to start TabletServer. Logger is set 
to use blocksize " + walogMaxSize + " but hdfs minimum block size is "
 +          + minBlockSize + ". Either increase the " + 
Property.TSERV_WALOG_MAX_SIZE + " or decrease 
dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
  
 -        throw new CancellationException();
 +    final long toleratedWalCreationFailures = 
aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES);
 +    final long walCreationFailureRetryIncrement = 
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
 +    final long walCreationFailureRetryMax = 
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
 +    // Tolerate `toleratedWalCreationFailures` failures, waiting 
`walCreationFailureRetryIncrement` milliseconds after the first failure,
 +    // incrementing the next wait period by the same value, for a maximum of 
`walCreationFailureRetryMax` retries.
 +    final RetryFactory walCreationRetryFactory = new 
RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement,
 +        walCreationFailureRetryIncrement, walCreationFailureRetryMax);
 +
-     logger = new TabletServerLogger(this, walogMaxSize, syncCounter, 
flushCounter, walCreationRetryFactory);
++    logger = new TabletServerLogger(this, walogMaxSize, syncCounter, 
flushCounter, walCreationRetryFactory, walogMaxAge);
 +    this.resourceManager = new TabletServerResourceManager(this, fs);
 +    this.security = AuditedSecurityOperation.getInstance(this);
 +
 +    metricsFactory = new TabletServerMetricsFactory(aconf);
 +    updateMetrics = metricsFactory.createUpdateMetrics();
 +    scanMetrics = metricsFactory.createScanMetrics();
 +    mincMetrics = metricsFactory.createMincMetrics();
 +    SimpleTimer.getInstance(aconf).schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        TabletLocator.clearLocators();
        }
 +    }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), 
jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
  
 -      if (r == null)
 -        throw new TimeoutException();
 -
 -      // make this method stop working now that something is being
 -      // returned
 -      resultQueue = null;
 -
 -      if (r instanceof Throwable)
 -        throw new ExecutionException((Throwable) r);
 -
 -      return (T) r;
 -    }
 -
 -    @Override
 -    public boolean isCancelled() {
 -      return state.get() == CANCELED;
 -    }
 -
 -    @Override
 -    public boolean isDone() {
 -      return runState.get().equals(ScanRunState.FINISHED);
 -    }
 -
 -    public ScanRunState getScanRunState() {
 -      return runState.get();
 -    }
 -
 -  }
 -
 -  private static class ConditionalSession extends Session {
 -    public TCredentials credentials;
 -    public Authorizations auths;
 -    public String tableId;
 -    public AtomicBoolean interruptFlag;
 -
 -    @Override
 -    public boolean cleanup() {
 -      interruptFlag.set(true);
 -      return true;
 +    // Create the secret manager
 +    setSecretManager(new AuthenticationTokenSecretManager(instance, 
aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME)));
 +    if (aconf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
 +      log.info("SASL is enabled, creating ZooKeeper watcher for 
AuthenticationKeys");
 +      // Watcher to notice new AuthenticationKeys which enable delegation 
tokens
 +      authKeyWatcher = new ZooAuthenticationKeyWatcher(getSecretManager(), 
ZooReaderWriter.getInstance(), ZooUtil.getRoot(instance)
 +          + Constants.ZDELEGATION_TOKEN_KEYS);
 +    } else {
 +      authKeyWatcher = null;
      }
    }
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b286ba5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 11585f2,158fdbd..c7b6c98
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -87,11 -77,15 +88,13 @@@ public class TabletServerLogger 
  
    private final AtomicInteger seqGen = new AtomicInteger();
  
 +  private final AtomicLong syncCounter;
 +  private final AtomicLong flushCounter;
 +
+   private long createTime = 0;
+ 
 -  private static boolean enabled(Tablet tablet) {
 -    return 
tablet.getTableConfiguration().getBoolean(Property.TABLE_WALOG_ENABLED);
 -  }
 -
 -  private static boolean enabled(CommitSession commitSession) {
 -    return enabled(commitSession.getTablet());
 -  }
 +  private final RetryFactory retryFactory;
 +  private Retry retry = null;
  
    static private abstract class TestCallWithWriteLock {
      abstract boolean test();
@@@ -136,13 -130,10 +139,14 @@@
      }
    }
  
-   public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong 
syncCounter, AtomicLong flushCounter, RetryFactory retryFactory) {
 -  public TabletServerLogger(TabletServer tserver, long maxSize, long maxAge) {
++  public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong 
syncCounter, AtomicLong flushCounter, RetryFactory retryFactory, long maxAge) {
      this.tserver = tserver;
      this.maxSize = maxSize;
 +    this.syncCounter = syncCounter;
 +    this.flushCounter = flushCounter;
 +    this.retryFactory = retryFactory;
 +    this.retry = null;
+     this.maxAge = maxAge;
    }
  
    private int initializeLoggers(final List<DfsLogger> copy) throws 
IOException {
@@@ -200,38 -189,19 +204,39 @@@
        alog.open(tserver.getClientAddressString());
        loggers.add(alog);
        logSetId.incrementAndGet();
 +
 +      // When we successfully create a WAL, make sure to reset the Retry.
 +      if (null != retry) {
 +        retry = null;
 +      }
 +
+       this.createTime = System.currentTimeMillis();
        return;
      } catch (Exception t) {
 -      throw new RuntimeException(t);
 -    }
 -  }
 +      if (null == retry) {
 +        retry = retryFactory.create();
 +      }
  
 -  public void resetLoggers() throws IOException {
 -    logSetLock.writeLock().lock();
 -    try {
 -      close();
 -    } finally {
 -      logSetLock.writeLock().unlock();
 +      // We have more retries or we exceeded the maximum number of accepted 
failures
 +      if (retry.canRetry()) {
 +        // Use the retry and record the time in which we did so
 +        retry.useRetry();
 +
 +        try {
 +          // Backoff
 +          retry.waitForNextAttempt();
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          throw new RuntimeException(e);
 +        }
 +      } else {
 +        log.error("Repeatedly failed to create WAL. Going to exit 
tabletserver.", t);
 +        // We didn't have retries or we failed too many times.
 +        Halt.halt("Experienced too many errors creating WALs, giving up");
 +      }
 +
 +      // The exception will trigger the log creation to be re-attempted.
 +      throw new RuntimeException(t);
      }
    }
  
@@@ -358,18 -317,15 +363,18 @@@
          });
        }
      }
-     // if the log gets too big, reset it .. grab the write lock first
+     // if the log gets too big or too old, reset it .. grab the write lock 
first
      logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead
      testLockAndRun(logSetLock, new TestCallWithWriteLock() {
 +      @Override
        boolean test() {
-         return logSizeEstimate.get() > maxSize;
+         return (logSizeEstimate.get() > maxSize) || 
((System.currentTimeMillis() - createTime) > maxAge);
        }
  
 +      @Override
        void withWriteLock() throws IOException {
          close();
 +        closeForReplication(sessions);
        }
      });
      return seq;

Reply via email to