Merge branch '1.6' into 1.7 Conflicts: server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/01cdd020 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/01cdd020 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/01cdd020 Branch: refs/heads/master Commit: 01cdd0205657a61d59a80294e0f22aa010fdf966 Parents: acf68ad e1e4100 Author: Josh Elser <els...@apache.org> Authored: Thu Dec 31 00:08:46 2015 -0500 Committer: Josh Elser <els...@apache.org> Committed: Thu Dec 31 00:08:46 2015 -0500 ---------------------------------------------------------------------- .../accumulo/core/client/BatchWriter.java | 5 ++++- .../file/blockfile/cache/LruBlockCache.java | 22 +++++++++----------- pom.xml | 2 +- .../accumulo/gc/SimpleGarbageCollector.java | 6 +++++- .../org/apache/accumulo/monitor/Monitor.java | 2 ++ .../org/apache/accumulo/tracer/TraceServer.java | 3 +++ .../apache/accumulo/tserver/TabletServer.java | 2 ++ .../accumulo/tserver/tablet/Compactor.java | 11 ++++++---- .../accumulo/tserver/tablet/MinorCompactor.java | 19 +++++++++-------- 9 files changed, 44 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 6bb8f9a,b98daf7..48633cb --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -138,11 -133,16 +139,14 @@@ public class SimpleGarbageCollector ext private GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); - private Instance instance; - public static void main(String[] args) throws UnknownHostException, IOException { - SecurityUtil.serverLogin(ServerConfiguration.getSiteConfiguration()); + SecurityUtil.serverLogin(SiteConfiguration.getInstance()); final String app = "gc"; Accumulo.setupLogging(app); - ServerConfigurationFactory conf = new ServerConfigurationFactory(HdfsZooInstance.getInstance()); + Instance instance = HdfsZooInstance.getInstance(); ++ ServerConfigurationFactory conf = new ServerConfigurationFactory(instance); + log.info("Version " + Constants.VERSION); + log.info("Instance " + instance.getInstanceID()); - ServerConfiguration conf = new ServerConfiguration(instance); final VolumeManager fs = VolumeManagerImpl.get(); Accumulo.init(fs, conf, app); Opts opts = new Opts(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java ---------------------------------------------------------------------- diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 24e2e3f,4e162c3..c3dd773 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@@ -427,16 -432,13 +427,18 @@@ public class Monitor Accumulo.setupLogging(app); VolumeManager fs = VolumeManagerImpl.get(); instance = HdfsZooInstance.getInstance(); - config = new ServerConfiguration(instance); + config = new ServerConfigurationFactory(instance); + context = new AccumuloServerContext(config); + log.info("Version " + Constants.VERSION); + log.info("Instance " + instance.getInstanceID()); Accumulo.init(fs, config, app); Monitor monitor = new Monitor(); - Accumulo.enableTracing(hostname, app); - monitor.run(hostname); + DistributedTrace.enable(hostname, app, config.getConfiguration()); + try { + monitor.run(hostname); + } finally { + DistributedTrace.disable(); + } } private static long START_TIME; http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java ---------------------------------------------------------------------- diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index 7511f3c,1df10a3..2d5d68d --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@@ -175,8 -170,10 +176,10 @@@ public class TraceServer implements Wat } - public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception { + public TraceServer(ServerConfigurationFactory serverConfiguration, String hostname) throws Exception { this.serverConfiguration = serverConfiguration; + log.info("Version " + Constants.VERSION); + log.info("Instance " + serverConfiguration.getInstance().getInstanceID()); AccumuloConfiguration conf = serverConfiguration.getConfiguration(); table = conf.get(Property.TRACE_TABLE); Connector connector = null; http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 1080d8d,29cf0d3..1e0d119 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -256,149 -255,734 +256,151 @@@ public class TabletServer extends Accum private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000; private static final long TIME_BETWEEN_GC_CHECKS = 5000; private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000; + private static final Set<Column> EMPTY_COLUMNS = Collections.emptySet(); - private TabletServerLogger logger; - - protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics(); - - private ServerConfiguration serverConfig; - private LogSorter logSorter = null; - - public TabletServer(ServerConfiguration conf, VolumeManager fs) { - super(); - this.serverConfig = conf; - this.instance = conf.getInstance(); - this.fs = fs; - - log.info("Version " + Constants.VERSION); - log.info("Instance " + instance.getInstanceID()); - - this.logSorter = new LogSorter(instance, fs, getSystemConfiguration()); - SimpleTimer.getInstance().schedule(new Runnable() { - @Override - public void run() { - synchronized (onlineTablets) { - long now = System.currentTimeMillis(); - for (Tablet tablet : onlineTablets.values()) - try { - tablet.updateRates(now); - } catch (Exception ex) { - log.error(ex, ex); - } - } - } - }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS); - SimpleTimer.getInstance().schedule(new Runnable() { - @Override - public void run() { - TabletLocator.clearLocators(); - } - }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS)); - } - - private static long jitter(long ms) { - Random r = new Random(); - // add a random 10% wait - return (long) ((1. + (r.nextDouble() / 10)) * ms); - } - - private synchronized static void logGCInfo(AccumuloConfiguration conf) { - long now = System.currentTimeMillis(); - - List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); - Runtime rt = Runtime.getRuntime(); - - StringBuilder sb = new StringBuilder("gc"); - - boolean sawChange = false; - - long maxIncreaseInCollectionTime = 0; - - for (GarbageCollectorMXBean gcBean : gcmBeans) { - Long prevTime = prevGcTime.get(gcBean.getName()); - long pt = 0; - if (prevTime != null) { - pt = prevTime; - } - - long time = gcBean.getCollectionTime(); - - if (time - pt != 0) { - sawChange = true; - } - - long increaseInCollectionTime = time - pt; - sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0)); - maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime); - prevGcTime.put(gcBean.getName(), time); - } - - long mem = rt.freeMemory(); - if (maxIncreaseInCollectionTime == 0) { - gcTimeIncreasedCount = 0; - } else { - gcTimeIncreasedCount++; - if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) { - log.warn("Running low on memory"); - gcTimeIncreasedCount = 0; - } - } - - if (mem > lastMemorySize) { - sawChange = true; - } + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); + private final TransactionWatcher watcher = new TransactionWatcher(); + private final ZooCache masterLockCache = new ZooCache(); - String sign = "+"; - if (mem - lastMemorySize <= 0) { - sign = ""; - } - - sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory())); + private final TabletServerLogger logger; - if (sawChange) { - log.debug(sb.toString()); - } - - final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) { - long diff = now - lastMemoryCheckTime; - if (diff > keepAliveTimeout + 1000) { - log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check", - TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.)); - } - lastMemoryCheckTime = now; - return; - } - - if (maxIncreaseInCollectionTime > keepAliveTimeout) { - Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1); - } + private final TabletServerMetricsFactory metricsFactory; + private final Metrics updateMetrics; + private final Metrics scanMetrics; + private final Metrics mincMetrics; - lastMemorySize = mem; - lastMemoryCheckTime = now; + public Metrics getMinCMetrics() { + return mincMetrics; } - private TabletStatsKeeper statsKeeper; - - private static class Session { - long lastAccessTime; - long startTime; - String user; - String client = TServerUtils.clientAddress.get(); - public boolean reserved; - - public void cleanup() {} - } - - private static class SessionManager { - - SecureRandom random; - Map<Long,Session> sessions; - long maxIdle; - - SessionManager(AccumuloConfiguration conf) { - random = new SecureRandom(); - sessions = new HashMap<Long,Session>(); - - maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); - - Runnable r = new Runnable() { - @Override - public void run() { - sweep(maxIdle); - } - }; - - SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000)); - } - - synchronized long createSession(Session session, boolean reserve) { - long sid = random.nextLong(); - - while (sessions.containsKey(sid)) { - sid = random.nextLong(); - } - - sessions.put(sid, session); - - session.reserved = reserve; - - session.startTime = session.lastAccessTime = System.currentTimeMillis(); - - return sid; - } - - long getMaxIdleTime() { - return maxIdle; - } - - /** - * while a session is reserved, it cannot be canceled or removed - */ - synchronized Session reserveSession(long sessionId) { - Session session = sessions.get(sessionId); - if (session != null) { - if (session.reserved) - throw new IllegalStateException(); - session.reserved = true; - } - - return session; - - } - - synchronized Session reserveSession(long sessionId, boolean wait) { - Session session = sessions.get(sessionId); - if (session != null) { - while (wait && session.reserved) { - try { - wait(1000); - } catch (InterruptedException e) { - throw new RuntimeException(); - } - } - - if (session.reserved) - throw new IllegalStateException(); - session.reserved = true; - } - - return session; - - } - - synchronized void unreserveSession(Session session) { - if (!session.reserved) - throw new IllegalStateException(); - notifyAll(); - session.reserved = false; - session.lastAccessTime = System.currentTimeMillis(); - } - - synchronized void unreserveSession(long sessionId) { - Session session = getSession(sessionId); - if (session != null) - unreserveSession(session); - } - - synchronized Session getSession(long sessionId) { - Session session = sessions.get(sessionId); - if (session != null) - session.lastAccessTime = System.currentTimeMillis(); - return session; - } - - Session removeSession(long sessionId) { - return removeSession(sessionId, false); - } - - Session removeSession(long sessionId, boolean unreserve) { - Session session = null; - synchronized (this) { - session = sessions.remove(sessionId); - if (unreserve && session != null) - unreserveSession(session); - } - - // do clean up out side of lock.. - if (session != null) - session.cleanup(); - - return session; - } - - private void sweep(long maxIdle) { - ArrayList<Session> sessionsToCleanup = new ArrayList<Session>(); - synchronized (this) { - Iterator<Session> iter = sessions.values().iterator(); - while (iter.hasNext()) { - Session session = iter.next(); - long idleTime = System.currentTimeMillis() - session.lastAccessTime; - if (idleTime > maxIdle && !session.reserved) { - log.info("Closing idle session from user=" + session.user + ", client=" + session.client + ", idle=" + idleTime + "ms"); - iter.remove(); - sessionsToCleanup.add(session); - } - } - } - - // do clean up outside of lock - for (Session session : sessionsToCleanup) { - session.cleanup(); - } - } - - synchronized void removeIfNotAccessed(final long sessionId, final long delay) { - Session session = sessions.get(sessionId); - if (session != null) { - final long removeTime = session.lastAccessTime; - TimerTask r = new TimerTask() { - @Override - public void run() { - Session sessionToCleanup = null; - synchronized (SessionManager.this) { - Session session2 = sessions.get(sessionId); - if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) { - log.info("Closing not accessed session from user=" + session2.user + ", client=" + session2.client + ", duration=" + delay + "ms"); - sessions.remove(sessionId); - sessionToCleanup = session2; - } - } - - // call clean up outside of lock - if (sessionToCleanup != null) - sessionToCleanup.cleanup(); - } - }; - - SimpleTimer.getInstance().schedule(r, delay); - } - } - - public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() { - Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>(); - for (Entry<Long,Session> entry : sessions.entrySet()) { - - Session session = entry.getValue(); - @SuppressWarnings("rawtypes") - ScanTask nbt = null; - String tableID = null; - - if (session instanceof ScanSession) { - ScanSession ss = (ScanSession) session; - nbt = ss.nextBatchTask; - tableID = ss.extent.getTableId().toString(); - } else if (session instanceof MultiScanSession) { - MultiScanSession mss = (MultiScanSession) session; - nbt = mss.lookupTask; - tableID = mss.threadPoolExtent.getTableId().toString(); - } - - if (nbt == null) - continue; + private final LogSorter logSorter; + private ReplicationWorker replWorker = null; + private final TabletStatsKeeper statsKeeper; + private final AtomicInteger logIdGenerator = new AtomicInteger(); - ScanRunState srs = nbt.getScanRunState(); + private final AtomicLong flushCounter = new AtomicLong(0); + private final AtomicLong syncCounter = new AtomicLong(0); - if (srs == ScanRunState.FINISHED) - continue; + private final VolumeManager fs; - MapCounter<ScanRunState> stateCounts = counts.get(tableID); - if (stateCounts == null) { - stateCounts = new MapCounter<ScanRunState>(); - counts.put(tableID, stateCounts); - } + private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>()); + private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>()); + private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>()); + @SuppressWarnings("unchecked") + private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000)); - stateCounts.increment(srs, 1); - } + private final TabletServerResourceManager resourceManager; + private final SecurityOperation security; - return counts; - } + private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>(); - public synchronized List<ActiveScan> getActiveScans() { + private Thread majorCompactorThread; - ArrayList<ActiveScan> activeScans = new ArrayList<ActiveScan>(); + private HostAndPort replicationAddress; + private HostAndPort clientAddress; - long ct = System.currentTimeMillis(); + private volatile boolean serverStopRequested = false; + private volatile boolean majorCompactorDisabled = false; + private volatile boolean shutdownComplete = false; - for (Entry<Long,Session> entry : sessions.entrySet()) { - Session session = entry.getValue(); - if (session instanceof ScanSession) { - ScanSession ss = (ScanSession) session; + private ZooLock tabletServerLock; - ScanState state = ScanState.RUNNING; + private TServer server; + private TServer replServer; - ScanTask<ScanBatch> nbt = ss.nextBatchTask; - if (nbt == null) { - state = ScanState.IDLE; - } else { - switch (nbt.getScanRunState()) { - case QUEUED: - state = ScanState.QUEUED; - break; - case FINISHED: - state = ScanState.IDLE; - break; - case RUNNING: - default: - /* do nothing */ - break; - } - } + private DistributedWorkQueue bulkFailedCopyQ; - ActiveScan activeScan = new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, - ScanType.SINGLE, state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, - ss.auths.getAuthorizationsBB()); + private String lockID; - // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor - activeScan.setScanId(entry.getKey()); - activeScans.add(activeScan); + public static final AtomicLong seekCount = new AtomicLong(0); - } else if (session instanceof MultiScanSession) { - MultiScanSession mss = (MultiScanSession) session; + private final AtomicLong totalMinorCompactions = new AtomicLong(0); + private final ServerConfigurationFactory confFactory; - ScanState state = ScanState.RUNNING; + private final ZooAuthenticationKeyWatcher authKeyWatcher; - ScanTask<MultiScanResult> nbt = mss.lookupTask; - if (nbt == null) { - state = ScanState.IDLE; - } else { - switch (nbt.getScanRunState()) { - case QUEUED: - state = ScanState.QUEUED; - break; - case FINISHED: - state = ScanState.IDLE; - break; - case RUNNING: - default: - /* do nothing */ - break; + public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) { + super(confFactory); + this.confFactory = confFactory; + this.fs = fs; + AccumuloConfiguration aconf = getConfiguration(); + Instance instance = getInstance(); ++ log.info("Version " + Constants.VERSION); ++ log.info("Instance " + instance.getInstanceID()); + this.sessionManager = new SessionManager(aconf); + this.logSorter = new LogSorter(instance, fs, aconf); + this.replWorker = new ReplicationWorker(this, fs); + this.statsKeeper = new TabletStatsKeeper(); + SimpleTimer.getInstance(aconf).schedule(new Runnable() { + @Override + public void run() { + synchronized (onlineTablets) { + long now = System.currentTimeMillis(); + for (Tablet tablet : onlineTablets.values()) + try { + tablet.updateRates(now); + } catch (Exception ex) { + log.error("Error updating rates for {}", tablet.getExtent(), ex); } - } - - activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime, - ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths - .getAuthorizationsBB())); } } + }, 5000, 5000); - return activeScans; - } - } - - static class TservConstraintEnv implements Environment { - - private TCredentials credentials; - private SecurityOperation security; - private Authorizations auths; - private KeyExtent ke; - - TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) { - this.security = secOp; - this.credentials = credentials; - } - - void setExtent(KeyExtent ke) { - this.ke = ke; - } - - @Override - public KeyExtent getExtent() { - return ke; - } - - @Override - public String getUser() { - return credentials.getPrincipal(); - } - - @Override - @Deprecated - public Authorizations getAuthorizations() { - if (auths == null) - try { - this.auths = security.getUserAuthorizations(credentials); - } catch (ThriftSecurityException e) { - throw new RuntimeException(e); - } - return auths; - } - - @Override - public AuthorizationContainer getAuthorizationsContainer() { - return new AuthorizationContainer() { - @Override - public boolean contains(ByteSequence auth) { - try { - return security.userHasAuthorizations(credentials, - Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), auth.length()))); - } catch (ThriftSecurityException e) { - throw new RuntimeException(e); - } - } - }; - } - } - - private abstract class ScanTask<T> implements RunnableFuture<T> { - - protected AtomicBoolean interruptFlag; - protected ArrayBlockingQueue<Object> resultQueue; - protected AtomicInteger state; - protected AtomicReference<ScanRunState> runState; - - private static final int INITIAL = 1; - private static final int ADDED = 2; - private static final int CANCELED = 3; - - ScanTask() { - interruptFlag = new AtomicBoolean(false); - runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED); - state = new AtomicInteger(INITIAL); - resultQueue = new ArrayBlockingQueue<Object>(1); - } - - protected void addResult(Object o) { - if (state.compareAndSet(INITIAL, ADDED)) - resultQueue.add(o); - else if (state.get() == ADDED) - throw new IllegalStateException("Tried to add more than one result"); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (!mayInterruptIfRunning) - throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task"); - - if (state.get() == CANCELED) - return true; - - if (state.compareAndSet(INITIAL, CANCELED)) { - interruptFlag.set(true); - resultQueue = null; - return true; - } - - return false; - } - - @Override - public T get() throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @SuppressWarnings("unchecked") - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - - ArrayBlockingQueue<Object> localRQ = resultQueue; - - if (state.get() == CANCELED) - throw new CancellationException(); - - if (localRQ == null) { - int st = state.get(); - String stateStr; - switch (st) { - case ADDED: - stateStr = "ADDED"; - break; - case CANCELED: - stateStr = "CANCELED"; - break; - case INITIAL: - stateStr = "INITIAL"; - break; - default: - stateStr = "UNKNOWN"; - break; - } - throw new IllegalStateException("Tried to get result twice [state=" + stateStr + "(" + st + ")]"); - } - - Object r = localRQ.poll(timeout, unit); - - // could have been canceled while waiting - if (state.get() == CANCELED) { - if (r != null) - throw new IllegalStateException("Nothing should have been added when in canceled state"); + final long walogMaxSize = aconf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE); + final long minBlockSize = CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size", 0); + if (minBlockSize != 0 && minBlockSize > walogMaxSize) + throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " + walogMaxSize + " but hdfs minimum block size is " + + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml."); - throw new CancellationException(); + final long toleratedWalCreationFailures = aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES); + final long walCreationFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT); + final long walCreationFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION); + // Tolerate `toleratedWalCreationFailures` failures, waiting `walCreationFailureRetryIncrement` milliseconds after the first failure, + // incrementing the next wait period by the same value, for a maximum of `walCreationFailureRetryMax` retries. + final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement, + walCreationFailureRetryIncrement, walCreationFailureRetryMax); + + logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory); + this.resourceManager = new TabletServerResourceManager(this, fs); + this.security = AuditedSecurityOperation.getInstance(this); + + metricsFactory = new TabletServerMetricsFactory(aconf); + updateMetrics = metricsFactory.createUpdateMetrics(); + scanMetrics = metricsFactory.createScanMetrics(); + mincMetrics = metricsFactory.createMincMetrics(); + SimpleTimer.getInstance(aconf).schedule(new Runnable() { + @Override + public void run() { + TabletLocator.clearLocators(); } + }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS)); - if (r == null) - throw new TimeoutException(); - - // make this method stop working now that something is being - // returned - resultQueue = null; - - if (r instanceof Throwable) - throw new ExecutionException((Throwable) r); - - return (T) r; - } - - @Override - public boolean isCancelled() { - return state.get() == CANCELED; - } - - @Override - public boolean isDone() { - return runState.get().equals(ScanRunState.FINISHED); - } - - public ScanRunState getScanRunState() { - return runState.get(); - } - - } - - private static class ConditionalSession extends Session { - public TCredentials credentials; - public Authorizations auths; - public String tableId; - public AtomicBoolean interruptFlag; - - @Override - public void cleanup() { - interruptFlag.set(true); - } - } - - private static class UpdateSession extends Session { - public Tablet currentTablet; - public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>(); - Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>(); - HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>(); - public Violations violations; - public TCredentials credentials; - public long totalUpdates = 0; - public long flushTime = 0; - Stat prepareTimes = new Stat(); - Stat walogTimes = new Stat(); - Stat commitTimes = new Stat(); - Stat authTimes = new Stat(); - public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>(); - public long queuedMutationSize = 0; - TservConstraintEnv cenv = null; - } - - private static class ScanSession extends Session { - public KeyExtent extent; - public HashSet<Column> columnSet; - public List<IterInfo> ssiList; - public Map<String,Map<String,String>> ssio; - public Authorizations auths; - public long entriesReturned = 0; - public Stat nbTimes = new Stat(); - public long batchCount = 0; - public volatile ScanTask<ScanBatch> nextBatchTask; - public AtomicBoolean interruptFlag; - public Scanner scanner; - public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD; - - @Override - public void cleanup() { - try { - if (nextBatchTask != null) - nextBatchTask.cancel(true); - } finally { - if (scanner != null) - scanner.close(); - } + // Create the secret manager + setSecretManager(new AuthenticationTokenSecretManager(instance, aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME))); + if (aconf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + log.info("SASL is enabled, creating ZooKeeper watcher for AuthenticationKeys"); + // Watcher to notice new AuthenticationKeys which enable delegation tokens + authKeyWatcher = new ZooAuthenticationKeyWatcher(getSecretManager(), ZooReaderWriter.getInstance(), ZooUtil.getRoot(instance) + + Constants.ZDELEGATION_TOKEN_KEYS); + } else { + authKeyWatcher = null; } - } - private static class MultiScanSession extends Session { - HashSet<Column> columnSet; - Map<KeyExtent,List<Range>> queries; - public List<IterInfo> ssiList; - public Map<String,Map<String,String>> ssio; - public Authorizations auths; - - // stats - int numRanges; - int numTablets; - int numEntries; - long totalLookupTime; - - public volatile ScanTask<MultiScanResult> lookupTask; - public KeyExtent threadPoolExtent; - - @Override - public void cleanup() { - if (lookupTask != null) - lookupTask.cancel(true); - } + private static long jitter(long ms) { + Random r = new Random(); + // add a random 10% wait + return (long) ((1. + (r.nextDouble() / 10)) * ms); } - /** - * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids - * are monotonically increasing. - * - */ - static class WriteTracker { - private static AtomicLong operationCounter = new AtomicLong(1); - private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class); - - WriteTracker() { - for (TabletType ttype : TabletType.values()) { - inProgressWrites.put(ttype, new TreeSet<Long>()); - } - } + private final SessionManager sessionManager; - synchronized long startWrite(TabletType ttype) { - long operationId = operationCounter.getAndIncrement(); - inProgressWrites.get(ttype).add(operationId); - return operationId; - } - - synchronized void finishWrite(long operationId) { - if (operationId == -1) - return; + private final WriteTracker writeTracker = new WriteTracker(); - boolean removed = false; + private final RowLocks rowLocks = new RowLocks(); - for (TabletType ttype : TabletType.values()) { - removed = inProgressWrites.get(ttype).remove(operationId); - if (removed) - break; - } - - if (!removed) { - throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId); - } - - this.notifyAll(); - } - - synchronized void waitForWrites(TabletType ttype) { - long operationId = operationCounter.getAndIncrement(); - while (inProgressWrites.get(ttype).floor(operationId) != null) { - try { - this.wait(); - } catch (InterruptedException e) { - log.error(e, e); - } - } - } - - public long startWrite(Set<Tablet> keySet) { - if (keySet.size() == 0) - return -1; - - ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size()); - - for (Tablet tablet : keySet) - extents.add(tablet.getExtent()); - - return startWrite(TabletType.type(extents)); - } - } - - public AccumuloConfiguration getSystemConfiguration() { - return serverConfig.getConfiguration(); - } - - TransactionWatcher watcher = new TransactionWatcher(); + private final AtomicLong totalQueuedMutationSize = new AtomicLong(0); + private final ReentrantLock recoveryLock = new ReentrantLock(true); private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface { http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java index 0a8a9e3,0000000..4f38655 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java @@@ -1,432 -1,0 +1,435 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.iterators.system.DeletingIterator; +import org.apache.accumulo.core.iterators.system.MultiIterator; +import org.apache.accumulo.core.iterators.system.TimeSettingIterator; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.trace.Span; +import org.apache.accumulo.core.trace.Trace; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.problems.ProblemReportingIterator; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.tserver.InMemoryMap; +import org.apache.accumulo.tserver.MinorCompactionReason; +import org.apache.accumulo.tserver.TabletIteratorEnvironment; +import org.apache.accumulo.tserver.compaction.MajorCompactionReason; +import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Compactor implements Callable<CompactionStats> { + private static final Logger log = LoggerFactory.getLogger(Compactor.class); + private static final AtomicLong nextCompactorID = new AtomicLong(0); + + public static class CompactionCanceledException extends Exception { + private static final long serialVersionUID = 1L; + } + + public interface CompactionEnv { + + boolean isCompactionEnabled(); + + IteratorScope getIteratorScope(); + } + + private final Map<FileRef,DataFileValue> filesToCompact; + private final InMemoryMap imm; + private final FileRef outputFile; + private final boolean propogateDeletes; + private final AccumuloConfiguration acuTableConf; + private final CompactionEnv env; + private final VolumeManager fs; + protected final KeyExtent extent; + private final List<IteratorSetting> iterators; + + // things to report + private String currentLocalityGroup = ""; + private final long startTime; + + private int reason; + + private final AtomicLong entriesRead = new AtomicLong(0); + private final AtomicLong entriesWritten = new AtomicLong(0); + private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); + + // a unique id to identify a compactor + private final long compactorID = nextCompactorID.getAndIncrement(); + protected volatile Thread thread; + private final AccumuloServerContext context; + + public long getCompactorID() { + return compactorID; + } + + private synchronized void setLocalityGroup(String name) { + this.currentLocalityGroup = name; + } + + public synchronized String getCurrentLocalityGroup() { + return currentLocalityGroup; + } + + private void clearStats() { + entriesRead.set(0); + entriesWritten.set(0); + } + + protected static final Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>()); + + public static List<CompactionInfo> getRunningCompactions() { + ArrayList<CompactionInfo> compactions = new ArrayList<CompactionInfo>(); + + synchronized (runningCompactions) { + for (Compactor compactor : runningCompactions) { + compactions.add(new CompactionInfo(compactor)); + } + } + + return compactions; + } + + public Compactor(AccumuloServerContext context, Tablet tablet, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, + boolean propogateDeletes, CompactionEnv env, List<IteratorSetting> iterators, int reason, AccumuloConfiguration tableConfiguation) { + this.context = context; + this.extent = tablet.getExtent(); + this.fs = tablet.getTabletServer().getFileSystem(); + this.acuTableConf = tableConfiguation; + this.filesToCompact = files; + this.imm = imm; + this.outputFile = outputFile; + this.propogateDeletes = propogateDeletes; + this.env = env; + this.iterators = iterators; + this.reason = reason; + + startTime = System.currentTimeMillis(); + } + + public VolumeManager getFileSystem() { + return fs; + } + + KeyExtent getExtent() { + return extent; + } + + String getOutputFile() { + return outputFile.toString(); + } + + MajorCompactionReason getMajorCompactionReason() { + return MajorCompactionReason.values()[reason]; + } + + @Override + public CompactionStats call() throws IOException, CompactionCanceledException { + + FileSKVWriter mfw = null; + + CompactionStats majCStats = new CompactionStats(); + + boolean remove = runningCompactions.add(this); + + clearStats(); + ++ final Path outputFilePath = outputFile.path(); ++ final String outputFilePathName = outputFilePath.toString(); + String oldThreadName = Thread.currentThread().getName(); + String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile; + Thread.currentThread().setName(newThreadName); + thread = Thread.currentThread(); + try { + FileOperations fileFactory = FileOperations.getInstance(); - FileSystem ns = this.fs.getVolumeByPath(outputFile.path()).getFileSystem(); - mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf); ++ FileSystem ns = this.fs.getVolumeByPath(outputFilePath).getFileSystem(); ++ mfw = fileFactory.openWriter(outputFilePathName, ns, ns.getConf(), acuTableConf); + + Map<String,Set<ByteSequence>> lGroups; + try { + lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf); + } catch (LocalityGroupConfigurationError e) { + throw new IOException(e); + } + + long t1 = System.currentTimeMillis(); + + HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>(); + + if (mfw.supportsLocalityGroups()) { + for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) { + setLocalityGroup(entry.getKey()); + compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats); + allColumnFamilies.addAll(entry.getValue()); + } + } + + setLocalityGroup(""); + compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats); + + long t2 = System.currentTimeMillis(); + + FileSKVWriter mfwTmp = mfw; + mfw = null; // set this to null so we do not try to close it again in finally if the close fails + mfwTmp.close(); // if the close fails it will cause the compaction to fail + + // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close() + try { - FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf); ++ FileSKVIterator openReader = fileFactory.openReader(outputFilePathName, false, ns, ns.getConf(), acuTableConf); + openReader.close(); + } catch (IOException ex) { + log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex); + throw ex; + } + + log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(), + majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0)); + - majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf)); ++ majCStats.setFileSize(fileFactory.getFileSize(outputFilePathName, ns, ns.getConf(), acuTableConf)); + return majCStats; + } catch (IOException e) { + log.error("{}", e.getMessage(), e); + throw e; + } catch (RuntimeException e) { + log.error("{}", e.getMessage(), e); + throw e; + } finally { + Thread.currentThread().setName(oldThreadName); + if (remove) { + thread = null; + runningCompactions.remove(this); + } + + try { + if (mfw != null) { + // compaction must not have finished successfully, so close its output file + try { + mfw.close(); + } finally { + if (!fs.deleteRecursively(outputFile.path())) + if (fs.exists(outputFile.path())) + log.error("Unable to delete " + outputFile); + } + } + } catch (IOException e) { + log.warn("{}", e.getMessage(), e); + } catch (RuntimeException exception) { + log.warn("{}", exception.getMessage(), exception); + } + } + } + + private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException { + + List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size()); + + for (FileRef mapFile : filesToCompact.keySet()) { + try { + + FileOperations fileFactory = FileOperations.getInstance(); + FileSystem fs = this.fs.getVolumeByPath(mapFile.path()).getFileSystem(); + FileSKVIterator reader; + + reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), acuTableConf); + + readers.add(reader); + + SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(context, extent.getTableId().toString(), mapFile.path().toString(), false, reader); + + if (filesToCompact.get(mapFile).isTimeSet()) { + iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime()); + } + + iters.add(iter); + + } catch (Throwable e) { + + ProblemReports.getInstance(context).report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e)); + + log.warn("Some problem opening map file {} {}", mapFile, e.getMessage(), e); + // failed to open some map file... close the ones that were opened + for (FileSKVIterator reader : readers) { + try { + reader.close(); + } catch (Throwable e2) { + log.warn("Failed to close map file", e2); + } + } + + readers.clear(); + + if (e instanceof IOException) + throw (IOException) e; + throw new IOException("Failed to open map data files", e); + } + } + + return iters; + } + + private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats) + throws IOException, CompactionCanceledException { + ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size()); + Span span = Trace.start("compact"); + try { + long entriesCompacted = 0; + List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers); + + if (imm != null) { + iters.add(imm.compactionIterator()); + } + + CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead); + DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes); + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); + + // if(env.getIteratorScope() ) + + TabletIteratorEnvironment iterEnv; + if (env.getIteratorScope() == IteratorScope.majc) + iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf); + else if (env.getIteratorScope() == IteratorScope.minc) + iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf); + else + throw new IllegalArgumentException(); + + SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf, + iterators, iterEnv)); + + itr.seek(extent.toDataRange(), columnFamilies, inclusive); + + if (!inclusive) { + mfw.startDefaultLocalityGroup(); + } else { + mfw.startNewLocalityGroup(lgName, columnFamilies); + } + + Span write = Trace.start("write"); + try { + while (itr.hasTop() && env.isCompactionEnabled()) { + mfw.append(itr.getTopKey(), itr.getTopValue()); + itr.next(); + entriesCompacted++; + + if (entriesCompacted % 1024 == 0) { + // Periodically update stats, do not want to do this too often since its volatile + entriesWritten.addAndGet(1024); + } + } + + if (itr.hasTop() && !env.isCompactionEnabled()) { + // cancel major compaction operation + try { + try { + mfw.close(); + } catch (IOException e) { + log.error("{}", e.getMessage(), e); + } + fs.deleteRecursively(outputFile.path()); + } catch (Exception e) { + log.warn("Failed to delete Canceled compaction output file " + outputFile, e); + } + throw new CompactionCanceledException(); + } + + } finally { + CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted); + majCStats.add(lgMajcStats); + write.stop(); + } + + } finally { + // close sequence files opened + for (FileSKVIterator reader : readers) { + try { + reader.close(); + } catch (Throwable e) { + log.warn("Failed to close map file", e); + } + } + span.stop(); + } + } + + Collection<FileRef> getFilesToCompact() { + return filesToCompact.keySet(); + } + + boolean hasIMM() { + return imm != null; + } + + boolean willPropogateDeletes() { + return propogateDeletes; + } + + long getEntriesRead() { + return entriesRead.get(); + } + + long getEntriesWritten() { + return entriesWritten.get(); + } + + long getStartTime() { + return startTime; + } + + Iterable<IteratorSetting> getIterators() { + return this.iterators; + } + + MinorCompactionReason getMinCReason() { + return MinorCompactionReason.values()[reason]; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01cdd020/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java index 490ecd3,0000000..2aa772f mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java @@@ -1,147 -1,0 +1,148 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Random; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.tserver.InMemoryMap; +import org.apache.accumulo.tserver.MinorCompactionReason; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MinorCompactor extends Compactor { + + private static final Logger log = LoggerFactory.getLogger(MinorCompactor.class); + + private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap(); + + private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) { + if (mergeFile == null) + return EMPTY_MAP; + + return Collections.singletonMap(mergeFile, dfv); + } + + private final TabletServer tabletServer; + + public MinorCompactor(TabletServer tabletServer, Tablet tablet, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, + MinorCompactionReason mincReason, TableConfiguration tableConfig) { + super(tabletServer, tablet, toFileMap(mergeFile, dfv), imm, outputFile, true, new CompactionEnv() { + + @Override + public boolean isCompactionEnabled() { + return true; + } + + @Override + public IteratorScope getIteratorScope() { + return IteratorScope.minc; + } + }, Collections.<IteratorSetting> emptyList(), mincReason.ordinal(), tableConfig); + this.tabletServer = tabletServer; + } + + private boolean isTableDeleting() { + try { + return Tables.getTableState(tabletServer.getInstance(), extent.getTableId().toString()) == TableState.DELETING; + } catch (Exception e) { + log.warn("Failed to determine if table " + extent.getTableId() + " was deleting ", e); + return false; // can not get positive confirmation that its deleting. + } + } + + @Override + public CompactionStats call() { - log.debug("Begin minor compaction " + getOutputFile() + " " + getExtent()); ++ final String outputFileName = getOutputFile(); ++ log.debug("Begin minor compaction " + outputFileName + " " + getExtent()); + + // output to new MapFile with a temporary name + int sleepTime = 100; + double growthFactor = 4; + int maxSleepTime = 1000 * 60 * 3; // 3 minutes + boolean reportedProblem = false; + + runningCompactions.add(this); + try { + do { + try { + CompactionStats ret = super.call(); + + // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted, + // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes())); + + if (reportedProblem) { - ProblemReports.getInstance(tabletServer).deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile()); ++ ProblemReports.getInstance(tabletServer).deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, outputFileName); + } + + return ret; + } catch (IOException e) { - log.warn("MinC failed ({}) to create {} retrying ...", e.getMessage(), getOutputFile()); - ProblemReports.getInstance(tabletServer).report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); ++ log.warn("MinC failed ({}) to create {} retrying ...", e.getMessage(), outputFileName); ++ ProblemReports.getInstance(tabletServer).report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, outputFileName, e)); + reportedProblem = true; + } catch (RuntimeException e) { + // if this is coming from a user iterator, it is possible that the user could change the iterator config and that the + // minor compaction would succeed - log.warn("MinC failed ({}) to create {} retrying ...", e.getMessage(), getOutputFile(), e); - ProblemReports.getInstance(tabletServer).report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); ++ log.warn("MinC failed ({}) to create {} retrying ...", e.getMessage(), outputFileName, e); ++ ProblemReports.getInstance(tabletServer).report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, outputFileName, e)); + reportedProblem = true; + } catch (CompactionCanceledException e) { + throw new IllegalStateException(e); + } + + Random random = new Random(); + + int sleep = sleepTime + random.nextInt(sleepTime); + log.debug("MinC failed sleeping " + sleep + " ms before retrying"); + UtilWaitThread.sleep(sleep); + sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor)); + + // clean up + try { - if (getFileSystem().exists(new Path(getOutputFile()))) { - getFileSystem().deleteRecursively(new Path(getOutputFile())); ++ if (getFileSystem().exists(new Path(outputFileName))) { ++ getFileSystem().deleteRecursively(new Path(outputFileName)); + } + } catch (IOException e) { - log.warn("Failed to delete failed MinC file {} {}", getOutputFile(), e.getMessage()); ++ log.warn("Failed to delete failed MinC file {} {}", outputFileName, e.getMessage()); + } + + if (isTableDeleting()) + return new CompactionStats(0, 0); + + } while (true); + } finally { + thread = null; + runningCompactions.remove(this); + } + } + +}