Merge branch '1.6' into 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2b286ba5 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2b286ba5 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2b286ba5 Branch: refs/heads/master Commit: 2b286ba5660ec78e5f1cb675fde55ccdeba6a327 Parents: cd5eb1f 0206d78 Author: Dave Marion <dlmar...@apache.org> Authored: Thu Mar 31 16:23:03 2016 -0400 Committer: Dave Marion <dlmar...@apache.org> Committed: Thu Mar 31 16:23:03 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/accumulo/core/conf/Property.java | 3 +-- .../java/org/apache/accumulo/tserver/TabletServer.java | 3 ++- .../apache/accumulo/tserver/log/TabletServerLogger.java | 11 ++++++++--- 3 files changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b286ba5/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 28cc861,2149ad9..dbb2036 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -252,16 -209,7 +252,15 @@@ public enum Property + "must be made, which is slower. However opening too many files at once can cause problems."), TSERV_WALOG_MAX_SIZE("tserver.walog.max.size", "1G", PropertyType.MEMORY, "The maximum size for each write-ahead log. See comment for property tserver.memory.maps.max"), - + TSERV_WALOG_MAX_AGE("tserver.walog.max.age", "24h", PropertyType.TIMEDURATION, "The maximum age for each write-ahead log."), + TSERV_WALOG_TOLERATED_CREATION_FAILURES("tserver.walog.tolerated.creation.failures", "50", PropertyType.COUNT, + "The maximum number of failures tolerated when creating a new WAL file within the period specified by tserver.walog.failures.period." + + " Exceeding this number of failures in the period causes the TabletServer to exit."), + TSERV_WALOG_TOLERATED_WAIT_INCREMENT("tserver.walog.tolerated.wait.increment", "1000ms", PropertyType.TIMEDURATION, + "The amount of time to wait between failures to create a WALog."), + // Never wait longer than 5 mins for a retry + TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION("tserver.walog.maximum.wait.duration", "5m", PropertyType.TIMEDURATION, + "The maximum amount of time to wait after a failure to create a WAL file."), - TSERV_MAJC_DELAY("tserver.compaction.major.delay", "30s", PropertyType.TIMEDURATION, "Time a tablet server will sleep between checking which tablets need compaction."), TSERV_MAJC_THREAD_MAXOPEN("tserver.compaction.major.thread.files.open.max", "10", PropertyType.COUNT, http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b286ba5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index eabf51d,38bd8ac..a3b224f --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -254,132 -257,634 +254,133 @@@ public class TabletServer extends Accum private static final long TIME_BETWEEN_GC_CHECKS = 5000; private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000; - private TabletServerLogger logger; - - protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics(); + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); + private final TransactionWatcher watcher = new TransactionWatcher(); + private final ZooCache masterLockCache = new ZooCache(); - private ServerConfiguration serverConfig; - private LogSorter logSorter = null; + private final TabletServerLogger logger; - public TabletServer(ServerConfiguration conf, VolumeManager fs) { - super(); - this.serverConfig = conf; - this.instance = conf.getInstance(); - this.fs = fs; - - log.info("Version " + Constants.VERSION); - log.info("Instance " + instance.getInstanceID()); - - this.logSorter = new LogSorter(instance, fs, getSystemConfiguration()); - SimpleTimer.getInstance().schedule(new Runnable() { - @Override - public void run() { - synchronized (onlineTablets) { - long now = System.currentTimeMillis(); - for (Tablet tablet : onlineTablets.values()) - try { - tablet.updateRates(now); - } catch (Exception ex) { - log.error(ex, ex); - } - } - } - }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS); - SimpleTimer.getInstance().schedule(new Runnable() { - @Override - public void run() { - TabletLocator.clearLocators(); - } - }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS)); - } + private final TabletServerMetricsFactory metricsFactory; + private final Metrics updateMetrics; + private final Metrics scanMetrics; + private final Metrics mincMetrics; - private static long jitter(long ms) { - Random r = new Random(); - // add a random 10% wait - return (long) ((1. + (r.nextDouble() / 10)) * ms); + public Metrics getMinCMetrics() { + return mincMetrics; } - private synchronized static void logGCInfo(AccumuloConfiguration conf) { - long now = System.currentTimeMillis(); - - List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); - Runtime rt = Runtime.getRuntime(); - - StringBuilder sb = new StringBuilder("gc"); - - boolean sawChange = false; - - long maxIncreaseInCollectionTime = 0; - - for (GarbageCollectorMXBean gcBean : gcmBeans) { - Long prevTime = prevGcTime.get(gcBean.getName()); - long pt = 0; - if (prevTime != null) { - pt = prevTime; - } - - long time = gcBean.getCollectionTime(); - - if (time - pt != 0) { - sawChange = true; - } - - long increaseInCollectionTime = time - pt; - sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0)); - maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime); - prevGcTime.put(gcBean.getName(), time); - } - - long mem = rt.freeMemory(); - if (maxIncreaseInCollectionTime == 0) { - gcTimeIncreasedCount = 0; - } else { - gcTimeIncreasedCount++; - if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) { - log.warn("Running low on memory"); - gcTimeIncreasedCount = 0; - } - } - - if (mem > lastMemorySize) { - sawChange = true; - } - - String sign = "+"; - if (mem - lastMemorySize <= 0) { - sign = ""; - } - - sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory())); - - if (sawChange) { - log.debug(sb.toString()); - } - - final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) { - long diff = now - lastMemoryCheckTime; - if (diff > keepAliveTimeout + 1000) { - log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check", - TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.)); - } - lastMemoryCheckTime = now; - return; - } - - if (maxIncreaseInCollectionTime > keepAliveTimeout) { - Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1); - } - - lastMemorySize = mem; - lastMemoryCheckTime = now; - } - - private TabletStatsKeeper statsKeeper; - - private static class Session { - long lastAccessTime; - long startTime; - String user; - String client = TServerUtils.clientAddress.get(); - public boolean reserved; - - public boolean cleanup() { - return true; - } - } - - private static class SessionManager { - - SecureRandom random; - Map<Long,Session> sessions; - private long maxIdle; - private long maxUpdateIdle; - private List<Session> idleSessions = new ArrayList<Session>(); - private final Long expiredSessionMarker = Long.valueOf(-1); - - SessionManager(AccumuloConfiguration conf) { - random = new SecureRandom(); - sessions = new HashMap<Long,Session>(); - maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE); - maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); - - Runnable r = new Runnable() { - @Override - public void run() { - sweep(maxIdle, maxUpdateIdle); - } - }; - - SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000)); - } - - synchronized long createSession(Session session, boolean reserve) { - long sid = random.nextLong(); - - while (sessions.containsKey(sid)) { - sid = random.nextLong(); - } - - sessions.put(sid, session); - - session.reserved = reserve; - - session.startTime = session.lastAccessTime = System.currentTimeMillis(); - - return sid; - } - - long getMaxIdleTime() { - return maxIdle; - } - - /** - * while a session is reserved, it cannot be canceled or removed - */ - synchronized Session reserveSession(long sessionId) { - Session session = sessions.get(sessionId); - if (session != null) { - if (session.reserved) - throw new IllegalStateException(); - session.reserved = true; - } - - return session; - - } - - synchronized Session reserveSession(long sessionId, boolean wait) { - Session session = sessions.get(sessionId); - if (session != null) { - while (wait && session.reserved) { - try { - wait(1000); - } catch (InterruptedException e) { - throw new RuntimeException(); - } - } - - if (session.reserved) - throw new IllegalStateException(); - session.reserved = true; - } - - return session; - - } - - synchronized void unreserveSession(Session session) { - if (!session.reserved) - throw new IllegalStateException(); - notifyAll(); - session.reserved = false; - session.lastAccessTime = System.currentTimeMillis(); - } - - synchronized void unreserveSession(long sessionId) { - Session session = getSession(sessionId); - if (session != null) - unreserveSession(session); - } - - synchronized Session getSession(long sessionId) { - Session session = sessions.get(sessionId); - if (session != null) - session.lastAccessTime = System.currentTimeMillis(); - return session; - } - - Session removeSession(long sessionId) { - return removeSession(sessionId, false); - } - - Session removeSession(long sessionId, boolean unreserve) { - Session session = null; - synchronized (this) { - session = sessions.remove(sessionId); - if (unreserve && session != null) - unreserveSession(session); - } - - // do clean up out side of lock.. - if (session != null) - session.cleanup(); - - return session; - } - - private void sweep(final long maxIdle, final long maxUpdateIdle) { - ArrayList<Session> sessionsToCleanup = new ArrayList<Session>(); - synchronized (this) { - Iterator<Session> iter = sessions.values().iterator(); - while (iter.hasNext()) { - Session session = iter.next(); - long configuredIdle = maxIdle; - if (session instanceof UpdateSession) { - configuredIdle = maxUpdateIdle; - } - long idleTime = System.currentTimeMillis() - session.lastAccessTime; - if (idleTime > configuredIdle && !session.reserved) { - log.info("Closing idle session from user=" + session.user + ", client=" + session.client + ", idle=" + idleTime + "ms"); - iter.remove(); - sessionsToCleanup.add(session); - } - } - } - - // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a synchronized list - - synchronized (idleSessions) { - - sessionsToCleanup.addAll(idleSessions); - - idleSessions.clear(); - - // perform cleanup for all of the sessions - for (Session session : sessionsToCleanup) { - if (!session.cleanup()) - idleSessions.add(session); - } - } - - } - - synchronized void removeIfNotAccessed(final long sessionId, final long delay) { - Session session = sessions.get(sessionId); - if (session != null) { - final long removeTime = session.lastAccessTime; - TimerTask r = new TimerTask() { - @Override - public void run() { - Session sessionToCleanup = null; - synchronized (SessionManager.this) { - Session session2 = sessions.get(sessionId); - if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) { - log.info("Closing not accessed session from user=" + session2.user + ", client=" + session2.client + ", duration=" + delay + "ms"); - sessions.remove(sessionId); - sessionToCleanup = session2; - } - } - - // call clean up outside of lock - if (sessionToCleanup != null) - sessionToCleanup.cleanup(); - } - }; - - SimpleTimer.getInstance().schedule(r, delay); - } - } - - public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() { - Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>(); - Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>(); - - synchronized (idleSessions) { - /** - * Add sessions so that get the list returned in the active scans call - */ - for (Session session : idleSessions) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } - } - - for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { - - Session session = entry.getValue(); - @SuppressWarnings("rawtypes") - ScanTask nbt = null; - String tableID = null; - - if (session instanceof ScanSession) { - ScanSession ss = (ScanSession) session; - nbt = ss.nextBatchTask; - tableID = ss.extent.getTableId().toString(); - } else if (session instanceof MultiScanSession) { - MultiScanSession mss = (MultiScanSession) session; - nbt = mss.lookupTask; - tableID = mss.threadPoolExtent.getTableId().toString(); - } - - if (nbt == null) - continue; + private final LogSorter logSorter; + private ReplicationWorker replWorker = null; + private final TabletStatsKeeper statsKeeper; + private final AtomicInteger logIdGenerator = new AtomicInteger(); - ScanRunState srs = nbt.getScanRunState(); + private final AtomicLong flushCounter = new AtomicLong(0); + private final AtomicLong syncCounter = new AtomicLong(0); - if (srs == ScanRunState.FINISHED) - continue; + private final VolumeManager fs; - MapCounter<ScanRunState> stateCounts = counts.get(tableID); - if (stateCounts == null) { - stateCounts = new MapCounter<ScanRunState>(); - counts.put(tableID, stateCounts); - } + private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>()); + private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>()); + private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>()); + @SuppressWarnings("unchecked") + private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000)); - stateCounts.increment(srs, 1); - } + private final TabletServerResourceManager resourceManager; + private final SecurityOperation security; - return counts; - } + private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>(); - public synchronized List<ActiveScan> getActiveScans() { + private Thread majorCompactorThread; - final List<ActiveScan> activeScans = new ArrayList<ActiveScan>(); - final long ct = System.currentTimeMillis(); - final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>(); + private HostAndPort replicationAddress; + private HostAndPort clientAddress; - synchronized (idleSessions) { - /** - * Add sessions so that get the list returned in the active scans call - */ - for (Session session : idleSessions) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } - } + private volatile boolean serverStopRequested = false; + private volatile boolean majorCompactorDisabled = false; + private volatile boolean shutdownComplete = false; - for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { - Session session = entry.getValue(); - if (session instanceof ScanSession) { - ScanSession ss = (ScanSession) session; + private ZooLock tabletServerLock; - ScanState state = ScanState.RUNNING; + private TServer server; + private TServer replServer; - ScanTask<ScanBatch> nbt = ss.nextBatchTask; - if (nbt == null) { - state = ScanState.IDLE; - } else { - switch (nbt.getScanRunState()) { - case QUEUED: - state = ScanState.QUEUED; - break; - case FINISHED: - state = ScanState.IDLE; - break; - case RUNNING: - default: - /* do nothing */ - break; - } - } + private DistributedWorkQueue bulkFailedCopyQ; - ActiveScan activeScan = new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, - ScanType.SINGLE, state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, - ss.auths.getAuthorizationsBB()); + private String lockID; - // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor - activeScan.setScanId(entry.getKey()); - activeScans.add(activeScan); + public static final AtomicLong seekCount = new AtomicLong(0); - } else if (session instanceof MultiScanSession) { - MultiScanSession mss = (MultiScanSession) session; + private final AtomicLong totalMinorCompactions = new AtomicLong(0); + private final ServerConfigurationFactory confFactory; - ScanState state = ScanState.RUNNING; + private final ZooAuthenticationKeyWatcher authKeyWatcher; - ScanTask<MultiScanResult> nbt = mss.lookupTask; - if (nbt == null) { - state = ScanState.IDLE; - } else { - switch (nbt.getScanRunState()) { - case QUEUED: - state = ScanState.QUEUED; - break; - case FINISHED: - state = ScanState.IDLE; - break; - case RUNNING: - default: - /* do nothing */ - break; + public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) { + super(confFactory); + this.confFactory = confFactory; + this.fs = fs; + AccumuloConfiguration aconf = getConfiguration(); + Instance instance = getInstance(); + log.info("Version " + Constants.VERSION); + log.info("Instance " + instance.getInstanceID()); + this.sessionManager = new SessionManager(aconf); + this.logSorter = new LogSorter(instance, fs, aconf); + this.replWorker = new ReplicationWorker(this, fs); + this.statsKeeper = new TabletStatsKeeper(); + SimpleTimer.getInstance(aconf).schedule(new Runnable() { + @Override + public void run() { + synchronized (onlineTablets) { + long now = System.currentTimeMillis(); + for (Tablet tablet : onlineTablets.values()) + try { + tablet.updateRates(now); + } catch (Exception ex) { + log.error("Error updating rates for {}", tablet.getExtent(), ex); } - } - - activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime, - ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths - .getAuthorizationsBB())); } } + }, 5000, 5000); - return activeScans; - } - } - - static class TservConstraintEnv implements Environment { - - private TCredentials credentials; - private SecurityOperation security; - private Authorizations auths; - private KeyExtent ke; - - TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) { - this.security = secOp; - this.credentials = credentials; - } - - void setExtent(KeyExtent ke) { - this.ke = ke; - } - - @Override - public KeyExtent getExtent() { - return ke; - } - - @Override - public String getUser() { - return credentials.getPrincipal(); - } - - @Override - @Deprecated - public Authorizations getAuthorizations() { - if (auths == null) - try { - this.auths = security.getUserAuthorizations(credentials); - } catch (ThriftSecurityException e) { - throw new RuntimeException(e); - } - return auths; - } - - @Override - public AuthorizationContainer getAuthorizationsContainer() { - return new AuthorizationContainer() { - @Override - public boolean contains(ByteSequence auth) { - try { - return security.userHasAuthorizations(credentials, - Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), auth.length()))); - } catch (ThriftSecurityException e) { - throw new RuntimeException(e); - } - } - }; - } - } - - private abstract class ScanTask<T> implements RunnableFuture<T> { - - protected AtomicBoolean interruptFlag; - protected ArrayBlockingQueue<Object> resultQueue; - protected AtomicInteger state; - protected AtomicReference<ScanRunState> runState; - - private static final int INITIAL = 1; - private static final int ADDED = 2; - private static final int CANCELED = 3; - - ScanTask() { - interruptFlag = new AtomicBoolean(false); - runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED); - state = new AtomicInteger(INITIAL); - resultQueue = new ArrayBlockingQueue<Object>(1); - } - - protected void addResult(Object o) { - if (state.compareAndSet(INITIAL, ADDED)) - resultQueue.add(o); - else if (state.get() == ADDED) - throw new IllegalStateException("Tried to add more than one result"); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (!mayInterruptIfRunning) - throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task"); - - if (state.get() == CANCELED) - return true; - - if (state.compareAndSet(INITIAL, CANCELED)) { - interruptFlag.set(true); - resultQueue = null; - return true; - } - - return false; - } - - @Override - public T get() throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @SuppressWarnings("unchecked") - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - - ArrayBlockingQueue<Object> localRQ = resultQueue; - - if (state.get() == CANCELED) - throw new CancellationException(); - - if (localRQ == null) { - int st = state.get(); - String stateStr; - switch (st) { - case ADDED: - stateStr = "ADDED"; - break; - case CANCELED: - stateStr = "CANCELED"; - break; - case INITIAL: - stateStr = "INITIAL"; - break; - default: - stateStr = "UNKNOWN"; - break; - } - throw new IllegalStateException("Tried to get result twice [state=" + stateStr + "(" + st + ")]"); - } - - Object r = localRQ.poll(timeout, unit); - - // could have been canceled while waiting - if (state.get() == CANCELED) { - if (r != null) - throw new IllegalStateException("Nothing should have been added when in canceled state"); + final long walogMaxSize = aconf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE); ++ final long walogMaxAge = aconf.getTimeInMillis(Property.TSERV_WALOG_MAX_AGE); + final long minBlockSize = CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size", 0); + if (minBlockSize != 0 && minBlockSize > walogMaxSize) + throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " + walogMaxSize + " but hdfs minimum block size is " + + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml."); - throw new CancellationException(); + final long toleratedWalCreationFailures = aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES); + final long walCreationFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT); + final long walCreationFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION); + // Tolerate `toleratedWalCreationFailures` failures, waiting `walCreationFailureRetryIncrement` milliseconds after the first failure, + // incrementing the next wait period by the same value, for a maximum of `walCreationFailureRetryMax` retries. + final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement, + walCreationFailureRetryIncrement, walCreationFailureRetryMax); + - logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory); ++ logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory, walogMaxAge); + this.resourceManager = new TabletServerResourceManager(this, fs); + this.security = AuditedSecurityOperation.getInstance(this); + + metricsFactory = new TabletServerMetricsFactory(aconf); + updateMetrics = metricsFactory.createUpdateMetrics(); + scanMetrics = metricsFactory.createScanMetrics(); + mincMetrics = metricsFactory.createMincMetrics(); + SimpleTimer.getInstance(aconf).schedule(new Runnable() { + @Override + public void run() { + TabletLocator.clearLocators(); } + }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS)); - if (r == null) - throw new TimeoutException(); - - // make this method stop working now that something is being - // returned - resultQueue = null; - - if (r instanceof Throwable) - throw new ExecutionException((Throwable) r); - - return (T) r; - } - - @Override - public boolean isCancelled() { - return state.get() == CANCELED; - } - - @Override - public boolean isDone() { - return runState.get().equals(ScanRunState.FINISHED); - } - - public ScanRunState getScanRunState() { - return runState.get(); - } - - } - - private static class ConditionalSession extends Session { - public TCredentials credentials; - public Authorizations auths; - public String tableId; - public AtomicBoolean interruptFlag; - - @Override - public boolean cleanup() { - interruptFlag.set(true); - return true; + // Create the secret manager + setSecretManager(new AuthenticationTokenSecretManager(instance, aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME))); + if (aconf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + log.info("SASL is enabled, creating ZooKeeper watcher for AuthenticationKeys"); + // Watcher to notice new AuthenticationKeys which enable delegation tokens + authKeyWatcher = new ZooAuthenticationKeyWatcher(getSecretManager(), ZooReaderWriter.getInstance(), ZooUtil.getRoot(instance) + + Constants.ZDELEGATION_TOKEN_KEYS); + } else { + authKeyWatcher = null; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/2b286ba5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 11585f2,158fdbd..c7b6c98 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@@ -87,11 -77,15 +88,13 @@@ public class TabletServerLogger private final AtomicInteger seqGen = new AtomicInteger(); + private final AtomicLong syncCounter; + private final AtomicLong flushCounter; + + private long createTime = 0; + - private static boolean enabled(Tablet tablet) { - return tablet.getTableConfiguration().getBoolean(Property.TABLE_WALOG_ENABLED); - } - - private static boolean enabled(CommitSession commitSession) { - return enabled(commitSession.getTablet()); - } + private final RetryFactory retryFactory; + private Retry retry = null; static private abstract class TestCallWithWriteLock { abstract boolean test(); @@@ -136,13 -130,10 +139,14 @@@ } } - public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong flushCounter, RetryFactory retryFactory) { - public TabletServerLogger(TabletServer tserver, long maxSize, long maxAge) { ++ public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong flushCounter, RetryFactory retryFactory, long maxAge) { this.tserver = tserver; this.maxSize = maxSize; + this.syncCounter = syncCounter; + this.flushCounter = flushCounter; + this.retryFactory = retryFactory; + this.retry = null; + this.maxAge = maxAge; } private int initializeLoggers(final List<DfsLogger> copy) throws IOException { @@@ -200,38 -189,19 +204,39 @@@ alog.open(tserver.getClientAddressString()); loggers.add(alog); logSetId.incrementAndGet(); + + // When we successfully create a WAL, make sure to reset the Retry. + if (null != retry) { + retry = null; + } + + this.createTime = System.currentTimeMillis(); return; } catch (Exception t) { - throw new RuntimeException(t); - } - } + if (null == retry) { + retry = retryFactory.create(); + } - public void resetLoggers() throws IOException { - logSetLock.writeLock().lock(); - try { - close(); - } finally { - logSetLock.writeLock().unlock(); + // We have more retries or we exceeded the maximum number of accepted failures + if (retry.canRetry()) { + // Use the retry and record the time in which we did so + retry.useRetry(); + + try { + // Backoff + retry.waitForNextAttempt(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } else { + log.error("Repeatedly failed to create WAL. Going to exit tabletserver.", t); + // We didn't have retries or we failed too many times. + Halt.halt("Experienced too many errors creating WALs, giving up"); + } + + // The exception will trigger the log creation to be re-attempted. + throw new RuntimeException(t); } } @@@ -358,18 -317,15 +363,18 @@@ }); } } - // if the log gets too big, reset it .. grab the write lock first + // if the log gets too big or too old, reset it .. grab the write lock first logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead testLockAndRun(logSetLock, new TestCallWithWriteLock() { + @Override boolean test() { - return logSizeEstimate.get() > maxSize; + return (logSizeEstimate.get() > maxSize) || ((System.currentTimeMillis() - createTime) > maxAge); } + @Override void withWriteLock() throws IOException { close(); + closeForReplication(sessions); } }); return seq;