ACCUMULO-2041 finished state management in Tablet
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f280e971 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f280e971 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f280e971 Branch: refs/heads/ACCUMULO-378 Commit: f280e9713ca3016cec3c082321774d579c86d51e Parents: 731abce Author: Eric C. Newton <eric.new...@gmail.com> Authored: Tue Jun 3 16:42:43 2014 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Tue Jun 3 16:42:43 2014 -0400 ---------------------------------------------------------------------- .../accumulo/server/tablets/TabletTime.java | 1 - .../accumulo/tserver/TabletStatsKeeper.java | 1 + .../accumulo/tserver/tablet/CompactionInfo.java | 16 +++ .../apache/accumulo/tserver/tablet/Tablet.java | 142 +++++++++---------- 4 files changed, 85 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java index 7f6dcf7..e3fd8f3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java @@ -45,7 +45,6 @@ public abstract class TabletTime { public abstract String getMetadataValue(); - // abstract long setUpdateTimes(Mutation mutation); public abstract long setUpdateTimes(List<Mutation> mutations); public abstract long getTime(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java index d914ac6..40906df 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java @@ -22,6 +22,7 @@ import org.apache.accumulo.server.util.ActionStatsUpdator; public class TabletStatsKeeper { + // suspect we need more synchronization in this class private ActionStats major = new ActionStats(); private ActionStats minor = new ActionStats(); private ActionStats split = new ActionStats(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java index ab57d65..8e9fb9b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.accumulo.tserver.tablet; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f280e971/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index dc2fc4d..2be00fe 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -143,7 +143,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException; * */ public class Tablet implements TabletCommitter { - static final Logger log = Logger.getLogger(Tablet.class); + static private final Logger log = Logger.getLogger(Tablet.class); static private final List<LogEntry> NO_LOG_ENTRIES = Collections.emptyList(); private final TabletServer tabletServer; @@ -166,23 +166,27 @@ public class Tablet implements TabletCommitter { private final AtomicLong dataSourceDeletions = new AtomicLong(0); public long getDataSourceDeletions() { return dataSourceDeletions.get(); } private final Set<ScanDataSource> activeScans = new HashSet<ScanDataSource>(); + + private static enum CloseState { + OPEN, + CLOSING, + CLOSED, + COMPLETE + } - private volatile boolean closing = false; - private boolean closed = false; - private boolean closeComplete = false; + private volatile CloseState closeState = CloseState.OPEN; private boolean updatingFlushID = false; private long lastFlushID = -1; private long lastCompactID = -1; - private volatile boolean majorCompactionInProgress = false; - private volatile boolean majorCompactionWaitingToStart = false; - private Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class)); - - private volatile boolean minorCompactionInProgress = false; - private volatile boolean minorCompactionWaitingToStart = false; + static enum CompactionState { WAITING_TO_START, IN_PROGRESS }; + private volatile CompactionState minorCompactionState = null; + private volatile CompactionState majorCompactionState = null; + private final Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class)); + private final AtomicReference<ConstraintChecker> constraintChecker = new AtomicReference<ConstraintChecker>(); private int writesInProgress = 0; @@ -220,7 +224,7 @@ public class Tablet implements TabletCommitter { return logId; } - public class LookupResult { + public static class LookupResult { public List<Range> unfinishedRanges = new ArrayList<Range>(); public long bytesAdded = 0; public long dataSize = 0; @@ -228,7 +232,7 @@ public class Tablet implements TabletCommitter { } FileRef getNextMapFilename(String prefix) throws IOException { - String extension = FileOperations.getNewFileExtension(this.tableConfiguration); + String extension = FileOperations.getNewFileExtension(tableConfiguration); checkTabletDir(); return new FileRef(location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension); } @@ -237,18 +241,18 @@ public class Tablet implements TabletCommitter { if (!tableDirChecked) { FileStatus[] files = null; try { - files = getTabletServer().getFileSystem().listStatus(this.location); + files = getTabletServer().getFileSystem().listStatus(location); } catch (FileNotFoundException ex) { // ignored } if (files == null) { - if (this.location.getName().startsWith("c-")) - log.debug("Tablet " + extent + " had no dir, creating " + this.location); // its a clone dir... + if (location.getName().startsWith("c-")) + log.debug("Tablet " + extent + " had no dir, creating " + location); // its a clone dir... else - log.warn("Tablet " + extent + " had no dir, creating " + this.location); + log.warn("Tablet " + extent + " had no dir, creating " + location); - getTabletServer().getFileSystem().mkdirs(this.location); + getTabletServer().getFileSystem().mkdirs(location); } tableDirChecked = true; } @@ -524,7 +528,7 @@ public class Tablet implements TabletCommitter { configObserver.propertiesChanged(); if (!logEntries.isEmpty()) { - log.info("Starting Write-Ahead Log recovery for " + this.extent); + log.info("Starting Write-Ahead Log recovery for " + extent); final long[] count = new long[2]; final CommitSession commitSession = getTabletMemory().getCommitSession(); count[1] = Long.MIN_VALUE; @@ -942,7 +946,7 @@ public class Tablet implements TabletCommitter { if (lastFlushID >= tableFlushID) return; - if (closing || closed || getTabletMemory().memoryReservedForMinC()) + if (isClosing() || isClosed() || getTabletMemory().memoryReservedForMinC()) return; if (getTabletMemory().getMemTable().getNumEntries() == 0) { @@ -1022,15 +1026,14 @@ public class Tablet implements TabletCommitter { synchronized (this) { t1 = System.currentTimeMillis(); - if (closing || closed || majorCompactionWaitingToStart || getTabletMemory().memoryReservedForMinC() || getTabletMemory().getMemTable().getNumEntries() == 0 + if (isClosing() || isClosed() || majorCompactionState == CompactionState.WAITING_TO_START || getTabletMemory().memoryReservedForMinC() || getTabletMemory().getMemTable().getNumEntries() == 0 || updatingFlushID) { logMessage = new StringBuilder(); logMessage.append(extent.toString()); - logMessage.append(" closing " + closing); - logMessage.append(" closed " + closed); - logMessage.append(" majorCompactionWaitingToStart " + majorCompactionWaitingToStart); + logMessage.append(" closeState " + closeState); + logMessage.append(" majorCompactionState " + majorCompactionState); if (getTabletMemory() != null) logMessage.append(" tabletMemory.memoryReservedForMinC() " + getTabletMemory().memoryReservedForMinC()); if (getTabletMemory() != null && getTabletMemory().getMemTable() != null) @@ -1145,8 +1148,7 @@ public class Tablet implements TabletCommitter { throw new IllegalStateException("waitingForLogs < 0 " + writesInProgress); } - if (closed || getTabletMemory() == null) { - // log.debug("tablet closed, can't commit"); + if (isClosed() || getTabletMemory() == null) { return null; } @@ -1217,7 +1219,7 @@ public class Tablet implements TabletCommitter { throw new IllegalStateException("waitingForLogs <= 0 " + writesInProgress); } - if (closeComplete || getTabletMemory() == null) { + if (isCloseComplete() || getTabletMemory() == null) { throw new IllegalStateException("aborting commit when tablet is closed"); } @@ -1245,7 +1247,7 @@ public class Tablet implements TabletCommitter { throw new IllegalStateException("commiting mutations after logging, but not waiting for any log messages"); } - if (closed && closeComplete) { + if (isCloseComplete()) { throw new IllegalStateException("tablet closed with outstanding messages to the logger"); } @@ -1284,28 +1286,24 @@ public class Tablet implements TabletCommitter { MinorCompactionTask mct = null; synchronized (this) { - if (closed || closing || closeComplete) { - String msg = "Tablet " + getExtent() + " already"; - if (closed) - msg += " closed"; - if (closing) - msg += " closing"; - if (closeComplete) - msg += " closeComplete"; + if (isClosed() || isClosing() || isCloseComplete()) { + String msg = "Tablet " + getExtent() + " already " + closeState; throw new IllegalStateException(msg); } // enter the closing state, no splits, minor, or major compactions can start // should cause running major compactions to stop - closing = true; + closeState = CloseState.CLOSING; this.notifyAll(); // determines if inserts and queries can still continue while minor compacting - closed = disableWrites; + if (disableWrites) { + closeState = CloseState.CLOSING; + } // wait for major compactions to finish, setting closing to // true should cause any running major compactions to abort - while (majorCompactionInProgress) { + while (majorCompactionRunning()) { try { this.wait(50); } catch (InterruptedException e) { @@ -1349,9 +1347,8 @@ public class Tablet implements TabletCommitter { synchronized void completeClose(boolean saveState, boolean completeClose) throws IOException { - if (!closing || closeComplete || closeCompleting) { - throw new IllegalStateException("closing = " + closing + " closed = " + closed + " closeComplete = " + closeComplete + " closeCompleting = " - + closeCompleting); + if (!isClosing() || isCloseComplete() || closeCompleting) { + throw new IllegalStateException("closeState = " + closeState); } log.debug("completeClose(saveState=" + saveState + " completeClose=" + completeClose + ") " + getExtent()); @@ -1359,7 +1356,7 @@ public class Tablet implements TabletCommitter { // ensure this method is only called once, also guards against multiple // threads entering the method at the same time closeCompleting = true; - closed = true; + closeState = CloseState.CLOSED; // modify dataSourceDeletions so scans will try to switch data sources and fail because the tablet is closed dataSourceDeletions.incrementAndGet(); @@ -1422,7 +1419,8 @@ public class Tablet implements TabletCommitter { tableConfiguration.getNamespaceConfiguration().removeObserver(configObserver); tableConfiguration.removeObserver(configObserver); - closeComplete = completeClose; + if (completeClose) + closeState = CloseState.COMPLETE; } private void closeConsistencyCheck() { @@ -1491,7 +1489,7 @@ public class Tablet implements TabletCommitter { public synchronized boolean initiateMajorCompaction(MajorCompactionReason reason) { - if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress || majorCompactionQueued.contains(reason)) { + if (isClosing() || isClosed() || !needsMajorCompaction(reason) || majorCompactionRunning() || majorCompactionQueued.contains(reason)) { return false; } @@ -1507,7 +1505,7 @@ public class Tablet implements TabletCommitter { * */ public boolean needsMajorCompaction(MajorCompactionReason reason) { - if (majorCompactionInProgress) + if (majorCompactionRunning()) return false; if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.USER) return true; @@ -1678,7 +1676,7 @@ public class Tablet implements TabletCommitter { public synchronized boolean needsSplit() { boolean ret; - if (closing || closed) + if (isClosing() || isClosed()) ret = false; else ret = findSplitRow(getDatafileManager().getFiles()) != null; @@ -1689,7 +1687,7 @@ public class Tablet implements TabletCommitter { // BEGIN PRIVATE METHODS RELATED TO MAJOR COMPACTION private boolean isCompactionEnabled() { - return !closing && !getTabletServer().isMajorCompactionDisabled(); + return !isClosing() && !getTabletServer().isMajorCompactionDisabled(); } private CompactionStats _majorCompact(MajorCompactionReason reason) throws IOException, CompactionCanceledException { @@ -1725,13 +1723,13 @@ public class Tablet implements TabletCommitter { t1 = System.currentTimeMillis(); - majorCompactionWaitingToStart = true; + majorCompactionState = CompactionState.WAITING_TO_START; getTabletMemory().waitForMinC(); t2 = System.currentTimeMillis(); - majorCompactionWaitingToStart = false; + majorCompactionState = null; notifyAll(); VolumeManager fs = getTabletServer().getFileSystem(); @@ -1953,11 +1951,11 @@ public class Tablet implements TabletCommitter { // check that compaction is still needed - defer to splitting majorCompactionQueued.remove(reason); - if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress || needsSplit()) { + if (isClosing() || isClosed ()|| !needsMajorCompaction(reason) || majorCompactionRunning() || needsSplit()) { return null; } - majorCompactionInProgress = true; + majorCompactionState = CompactionState.WAITING_TO_START; } try { @@ -1978,7 +1976,7 @@ public class Tablet implements TabletCommitter { // ensure we always reset boolean, even // when an exception is thrown synchronized (this) { - majorCompactionInProgress = false; + majorCompactionState = null; this.notifyAll(); } @@ -2033,27 +2031,27 @@ public class Tablet implements TabletCommitter { } public synchronized boolean isClosing() { - return closing; + return closeState == CloseState.CLOSING; } public synchronized boolean isClosed() { - return closed; + return closeState == CloseState.CLOSED; } public synchronized boolean isCloseComplete() { - return closeComplete; + return closeState == CloseState.COMPLETE; } public boolean majorCompactionRunning() { - return this.majorCompactionInProgress; + return majorCompactionState == CompactionState.IN_PROGRESS; } public boolean isMinorCompactionQueued() { - return minorCompactionWaitingToStart; + return minorCompactionState == CompactionState.WAITING_TO_START; } public boolean isMinorCompactionRunning() { - return minorCompactionInProgress; + return minorCompactionState == CompactionState.IN_PROGRESS; } public boolean isMajorCompactionQueued() { @@ -2104,11 +2102,11 @@ public class Tablet implements TabletCommitter { if (splitPoint == null || splitPoint.row == null) { log.info("had to abort split because splitRow was null"); - closing = false; + closeState = CloseState.OPEN; return null; } - closed = true; + closeState = CloseState.CLOSING; completeClose(true, false); Text midRow = splitPoint.row; @@ -2151,8 +2149,7 @@ public class Tablet implements TabletCommitter { log.debug(String.format("offline split time : %6.2f secs", (t2 - t1) / 1000.0)); - closeComplete = true; - + closeState = CloseState.COMPLETE; return newTablets; } } @@ -2213,7 +2210,7 @@ public class Tablet implements TabletCommitter { // Don't do it if we spent too long waiting for the lock long now = System.currentTimeMillis(); synchronized (this) { - if (closed) { + if (isClosed()) { throw new IOException("tablet " + extent + " is closed"); } @@ -2328,7 +2325,7 @@ public class Tablet implements TabletCommitter { try { synchronized (this) { - if (closed && closeComplete) { + if (isCloseComplete()) { throw new IllegalStateException("Can not update logs of closed tablet " + extent); } @@ -2409,12 +2406,12 @@ public class Tablet implements TabletCommitter { if (lastCompactID >= compactionId) return; - if (closing || closed || majorCompactionQueued.contains(MajorCompactionReason.USER) || majorCompactionInProgress) + if (isClosing() || isClosed() || majorCompactionQueued.contains(MajorCompactionReason.USER) || majorCompactionRunning()) return; if (getDatafileManager().getDatafileSizes().size() == 0) { // no files, so jsut update the metadata table - majorCompactionInProgress = true; + majorCompactionState = CompactionState.IN_PROGRESS; updateMetadata = true; lastCompactID = compactionId; } else @@ -2428,7 +2425,7 @@ public class Tablet implements TabletCommitter { MetadataTableUtil.updateTabletCompactID(extent, compactionId, SystemCredentials.get(), getTabletServer().getLock()); } finally { synchronized (this) { - majorCompactionInProgress = false; + majorCompactionState = null; this.notifyAll(); } } @@ -2535,20 +2532,19 @@ public class Tablet implements TabletCommitter { } public void minorCompactionWaitingToStart() { - minorCompactionWaitingToStart = true; + minorCompactionState = CompactionState.WAITING_TO_START; } public void minorCompactionStarted() { - minorCompactionWaitingToStart = false; - minorCompactionInProgress = true; + minorCompactionState = CompactionState.IN_PROGRESS; } public void minorCompactionComplete() { - minorCompactionInProgress = false; + minorCompactionState = null; } public boolean isMajorCompactionRunning() { - return majorCompactionInProgress; + return majorCompactionState == CompactionState.IN_PROGRESS; } public TabletStats getTabletStats() { @@ -2558,6 +2554,4 @@ public class Tablet implements TabletCommitter { public AtomicLong getScannedCounter() { return scannedCount; } - - }