http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java index 1d49647..659d877 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java @@ -52,23 +52,23 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; public class TabletLocatorImpl extends TabletLocator { - + private static final Logger log = Logger.getLogger(TabletLocatorImpl.class); - + // there seems to be a bug in TreeMap.tailMap related to // putting null in the treemap.. therefore instead of // putting null, put MAX_TEXT static final Text MAX_TEXT = new Text(); - + private static class EndRowComparator implements Comparator<Text>, Serializable { - + private static final long serialVersionUID = 1L; @Override public int compare(Text o1, Text o2) { - + int ret; - + if (o1 == MAX_TEXT) if (o2 == MAX_TEXT) ret = 0; @@ -78,21 +78,21 @@ public class TabletLocatorImpl extends TabletLocator { ret = -1; else ret = o1.compareTo(o2); - + return ret; } - + } - + static final EndRowComparator endRowComparator = new EndRowComparator(); - + protected Text tableId; protected TabletLocator parent; protected TreeMap<Text,TabletLocation> metaCache = new TreeMap<Text,TabletLocation>(endRowComparator); protected TabletLocationObtainer locationObtainer; private TabletServerLockChecker lockChecker; protected Text lastTabletRow; - + private TreeSet<KeyExtent> badExtents = new TreeSet<KeyExtent>(); private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock rLock = rwLock.readLock(); @@ -104,11 +104,11 @@ public class TabletLocatorImpl extends TabletLocator { */ TabletLocations lookupTablet(ClientContext context, TabletLocation src, Text row, Text stopRow, TabletLocator parent) throws AccumuloSecurityException, AccumuloException; - + List<TabletLocation> lookupTablets(ClientContext context, String tserver, Map<KeyExtent,List<Range>> map, TabletLocator parent) throws AccumuloSecurityException, AccumuloException; } - + public static interface TabletServerLockChecker { boolean isLockHeld(String tserver, String session); @@ -116,36 +116,36 @@ public class TabletLocatorImpl extends TabletLocator { } private class LockCheckerSession { - + private HashSet<Pair<String,String>> okLocks = new HashSet<Pair<String,String>>(); private HashSet<Pair<String,String>> invalidLocks = new HashSet<Pair<String,String>>(); - + private TabletLocation checkLock(TabletLocation tl) { // the goal of this class is to minimize calls out to lockChecker under that assumption that its a resource synchronized among many threads... want to // avoid fine grained synchronization when binning lots of mutations or ranges... remember decisions from the lockChecker in thread local unsynchronized // memory - + if (tl == null) return null; Pair<String,String> lock = new Pair<String,String>(tl.tablet_location, tl.tablet_session); - + if (okLocks.contains(lock)) return tl; - + if (invalidLocks.contains(lock)) return null; - + if (lockChecker.isLockHeld(tl.tablet_location, tl.tablet_session)) { okLocks.add(lock); return tl; } - + if (log.isTraceEnabled()) log.trace("Tablet server " + tl.tablet_location + " " + tl.tablet_session + " no longer holds its lock"); - + invalidLocks.add(lock); - + return null; } } @@ -155,34 +155,34 @@ public class TabletLocatorImpl extends TabletLocator { this.parent = parent; this.locationObtainer = tlo; this.lockChecker = tslc; - + this.lastTabletRow = new Text(tableId); lastTabletRow.append(new byte[] {'<'}, 0, 1); } - + @Override public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + OpTimer opTimer = null; if (log.isTraceEnabled()) opTimer = new OpTimer(log, Level.TRACE).start("Binning " + mutations.size() + " mutations for table " + tableId); - + ArrayList<T> notInCache = new ArrayList<T>(); Text row = new Text(); - + LockCheckerSession lcSession = new LockCheckerSession(); rLock.lock(); try { processInvalidated(context, lcSession); - + // for this to be efficient rows need to be in sorted order, but always sorting is slow... therefore only sort the // stuff not in the cache.... it is most efficient to pass _locateTablet rows in sorted order - + // For this to be efficient, need to avoid fine grained synchronization and fine grained logging. // Therefore methods called by this are not synchronized and should not log. - + for (T mutation : mutations) { row.set(mutation.getRow()); TabletLocation tl = locateTabletInCache(row); @@ -192,7 +192,7 @@ public class TabletLocatorImpl extends TabletLocator { } finally { rLock.unlock(); } - + if (notInCache.size() > 0) { Collections.sort(notInCache, new Comparator<Mutation>() { @Override @@ -200,7 +200,7 @@ public class TabletLocatorImpl extends TabletLocator { return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length); } }); - + wLock.lock(); try { boolean failed = false; @@ -211,11 +211,11 @@ public class TabletLocatorImpl extends TabletLocator { failures.add(mutation); continue; } - + row.set(mutation.getRow()); - + TabletLocation tl = _locateTablet(context, row, false, false, false, lcSession); - + if (tl == null || !addMutation(binnedMutations, mutation, tl, lcSession)) { failures.add(mutation); failed = true; @@ -233,7 +233,7 @@ public class TabletLocatorImpl extends TabletLocator { private <T extends Mutation> boolean addMutation(Map<String,TabletServerMutations<T>> binnedMutations, T mutation, TabletLocation tl, LockCheckerSession lcSession) { TabletServerMutations<T> tsm = binnedMutations.get(tl.tablet_location); - + if (tsm == null) { // do lock check once per tserver here to make binning faster boolean lockHeld = lcSession.checkLock(tl) != null; @@ -244,50 +244,50 @@ public class TabletLocatorImpl extends TabletLocator { return false; } } - + // its possible the same tserver could be listed with different sessions if (tsm.getSession().equals(tl.tablet_session)) { tsm.addMutation(tl.tablet_extent, mutation); return true; } - + return false; } - + private List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache, LockCheckerSession lcSession) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { List<Range> failures = new ArrayList<Range>(); List<TabletLocation> tabletLocations = new ArrayList<TabletLocation>(); - + boolean lookupFailed = false; - + l1: for (Range range : ranges) { - + tabletLocations.clear(); - + Text startRow; - + if (range.getStartKey() != null) { startRow = range.getStartKey().getRow(); } else startRow = new Text(); - + TabletLocation tl = null; - + if (useCache) tl = lcSession.checkLock(locateTabletInCache(startRow)); else if (!lookupFailed) tl = _locateTablet(context, startRow, false, false, false, lcSession); - + if (tl == null) { failures.add(range); if (!useCache) lookupFailed = true; continue; } - + tabletLocations.add(tl); - + while (tl.tablet_extent.getEndRow() != null && !range.afterEndKey(new Key(tl.tablet_extent.getEndRow()).followingKey(PartialKey.ROW))) { if (useCache) { Text row = new Text(tl.tablet_extent.getEndRow()); @@ -296,7 +296,7 @@ public class TabletLocatorImpl extends TabletLocator { } else { tl = _locateTablet(context, tl.tablet_extent.getEndRow(), true, false, false, lcSession); } - + if (tl == null) { failures.add(range); if (!useCache) @@ -309,46 +309,46 @@ public class TabletLocatorImpl extends TabletLocator { for (TabletLocation tl2 : tabletLocations) { TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, range); } - + } - + return failures; } - + @Override public List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + /* * For this to be efficient, need to avoid fine grained synchronization and fine grained logging. Therefore methods called by this are not synchronized and * should not log. */ - + OpTimer opTimer = null; if (log.isTraceEnabled()) opTimer = new OpTimer(log, Level.TRACE).start("Binning " + ranges.size() + " ranges for table " + tableId); - + LockCheckerSession lcSession = new LockCheckerSession(); List<Range> failures; rLock.lock(); try { processInvalidated(context, lcSession); - + // for this to be optimal, need to look ranges up in sorted order when // ranges are not present in cache... however do not want to always // sort ranges... therefore try binning ranges using only the cache // and sort whatever fails and retry - + failures = binRanges(context, ranges, binnedRanges, true, lcSession); } finally { rLock.unlock(); } - + if (failures.size() > 0) { // sort failures by range start key Collections.sort(failures); - + // try lookups again wLock.lock(); try { @@ -357,13 +357,13 @@ public class TabletLocatorImpl extends TabletLocator { wLock.unlock(); } } - + if (opTimer != null) opTimer.stop("Binned " + ranges.size() + " ranges for table " + tableId + " to " + binnedRanges.size() + " tservers in %DURATION%"); return failures; } - + @Override public void invalidateCache(KeyExtent failedExtent) { wLock.lock(); @@ -375,7 +375,7 @@ public class TabletLocatorImpl extends TabletLocator { if (log.isTraceEnabled()) log.trace("Invalidated extent=" + failedExtent); } - + @Override public void invalidateCache(Collection<KeyExtent> keySet) { wLock.lock(); @@ -387,11 +387,11 @@ public class TabletLocatorImpl extends TabletLocator { if (log.isTraceEnabled()) log.trace("Invalidated " + keySet.size() + " cache entries for table " + tableId); } - + @Override public void invalidateCache(Instance instance, String server) { int invalidatedCount = 0; - + wLock.lock(); try { for (TabletLocation cacheEntry : metaCache.values()) @@ -402,14 +402,14 @@ public class TabletLocatorImpl extends TabletLocator { } finally { wLock.unlock(); } - + lockChecker.invalidateCache(server); if (log.isTraceEnabled()) log.trace("invalidated " + invalidatedCount + " cache entries table=" + tableId + " server=" + server); - + } - + @Override public void invalidateCache() { int invalidatedCount; @@ -423,18 +423,18 @@ public class TabletLocatorImpl extends TabletLocator { if (log.isTraceEnabled()) log.trace("invalidated all " + invalidatedCount + " cache entries for table=" + tableId); } - + @Override public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + OpTimer opTimer = null; if (log.isTraceEnabled()) opTimer = new OpTimer(log, Level.TRACE).start("Locating tablet table=" + tableId + " row=" + TextUtil.truncate(row) + " skipRow=" + skipRow + " retry=" + retry); - + while (true) { - + LockCheckerSession lcSession = new LockCheckerSession(); TabletLocation tl = _locateTablet(context, row, skipRow, retry, true, lcSession); @@ -444,21 +444,21 @@ public class TabletLocatorImpl extends TabletLocator { log.trace("Failed to locate tablet containing row " + TextUtil.truncate(row) + " in table " + tableId + ", will retry..."); continue; } - + if (opTimer != null) opTimer.stop("Located tablet " + (tl == null ? null : tl.tablet_extent) + " at " + (tl == null ? null : tl.tablet_location) + " in %DURATION%"); - + return tl; } } - + private void lookupTabletLocation(ClientContext context, Text row, boolean retry, LockCheckerSession lcSession) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Text metadataRow = new Text(tableId); metadataRow.append(new byte[] {';'}, 0, 1); metadataRow.append(row.getBytes(), 0, row.getLength()); TabletLocation ptl = parent.locateTablet(context, metadataRow, false, retry); - + if (ptl != null) { TabletLocations locations = locationObtainer.lookupTablet(context, ptl, metadataRow, lastTabletRow, parent); while (locations != null && locations.getLocations().isEmpty() && locations.getLocationless().isEmpty()) { @@ -475,19 +475,19 @@ public class TabletLocatorImpl extends TabletLocator { break; } } - + if (locations == null) return; - + // cannot assume the list contains contiguous key extents... so it is probably // best to deal with each extent individually - + Text lastEndRow = null; for (TabletLocation tabletLocation : locations.getLocations()) { - + KeyExtent ke = tabletLocation.tablet_extent; TabletLocation locToCache; - + // create new location if current prevEndRow == endRow if ((lastEndRow != null) && (ke.getPrevEndRow() != null) && ke.getPrevEndRow().equals(lastEndRow)) { locToCache = new TabletLocation(new KeyExtent(ke.getTableId(), ke.getEndRow(), lastEndRow), tabletLocation.tablet_location, @@ -495,35 +495,35 @@ public class TabletLocatorImpl extends TabletLocator { } else { locToCache = tabletLocation; } - + // save endRow for next iteration lastEndRow = locToCache.tablet_extent.getEndRow(); - + updateCache(locToCache, lcSession); } } - + } - + private void updateCache(TabletLocation tabletLocation, LockCheckerSession lcSession) { if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) { // sanity check throw new IllegalStateException("Unexpected extent returned " + tableId + " " + tabletLocation.tablet_extent); } - + if (tabletLocation.tablet_location == null) { // sanity check throw new IllegalStateException("Cannot add null locations to cache " + tableId + " " + tabletLocation.tablet_extent); } - + if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) { // sanity check throw new IllegalStateException("Cannot add other table ids to locations cache " + tableId + " " + tabletLocation.tablet_extent); } - + // clear out any overlapping extents in cache removeOverlapping(metaCache, tabletLocation.tablet_extent); - + // do not add to cache unless lock is held if (lcSession.checkLock(tabletLocation) == null) return; @@ -533,14 +533,14 @@ public class TabletLocatorImpl extends TabletLocator { if (er == null) er = MAX_TEXT; metaCache.put(er, tabletLocation); - + if (badExtents.size() > 0) removeOverlapping(badExtents, tabletLocation.tablet_extent); } - + static void removeOverlapping(TreeMap<Text,TabletLocation> metaCache, KeyExtent nke) { Iterator<Entry<Text,TabletLocation>> iter = null; - + if (nke.getPrevEndRow() == null) { iter = metaCache.entrySet().iterator(); } else { @@ -548,30 +548,30 @@ public class TabletLocatorImpl extends TabletLocator { SortedMap<Text,TabletLocation> tailMap = metaCache.tailMap(row); iter = tailMap.entrySet().iterator(); } - + while (iter.hasNext()) { Entry<Text,TabletLocation> entry = iter.next(); - + KeyExtent ke = entry.getValue().tablet_extent; - + if (stopRemoving(nke, ke)) { break; } - + iter.remove(); } } - + private static boolean stopRemoving(KeyExtent nke, KeyExtent ke) { return ke.getPrevEndRow() != null && nke.getEndRow() != null && ke.getPrevEndRow().compareTo(nke.getEndRow()) >= 0; } - + private static Text rowAfterPrevRow(KeyExtent nke) { Text row = new Text(nke.getPrevEndRow()); row.append(new byte[] {0}, 0, 1); return row; } - + static void removeOverlapping(TreeSet<KeyExtent> extents, KeyExtent nke) { for (KeyExtent overlapping : KeyExtent.findOverlapping(nke, extents)) { extents.remove(overlapping); @@ -579,9 +579,9 @@ public class TabletLocatorImpl extends TabletLocator { } private TabletLocation locateTabletInCache(Text row) { - + Entry<Text,TabletLocation> entry = metaCache.ceilingEntry(row); - + if (entry != null) { KeyExtent ke = entry.getValue().tablet_extent; if (ke.getPrevEndRow() == null || ke.getPrevEndRow().compareTo(row) < 0) { @@ -590,17 +590,17 @@ public class TabletLocatorImpl extends TabletLocator { } return null; } - + protected TabletLocation _locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry, boolean lock, LockCheckerSession lcSession) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + if (skipRow) { row = new Text(row); row.append(new byte[] {0}, 0, 1); } - + TabletLocation tl; - + if (lock) rLock.lock(); try { @@ -610,30 +610,30 @@ public class TabletLocatorImpl extends TabletLocator { if (lock) rLock.unlock(); } - + if (tl == null) { if (lock) wLock.lock(); try { // not in cache, so obtain info lookupTabletLocation(context, row, retry, lcSession); - + tl = lcSession.checkLock(locateTabletInCache(row)); } finally { if (lock) wLock.unlock(); } } - + return tl; } - + private void processInvalidated(ClientContext context, LockCheckerSession lcSession) throws AccumuloSecurityException, AccumuloException, TableNotFoundException { - + if (badExtents.size() == 0) return; - + final boolean writeLockHeld = rwLock.isWriteLockedByCurrentThread(); try { if (!writeLockHeld) { @@ -642,27 +642,27 @@ public class TabletLocatorImpl extends TabletLocator { if (badExtents.size() == 0) return; } - + List<Range> lookups = new ArrayList<Range>(badExtents.size()); - + for (KeyExtent be : badExtents) { lookups.add(be.toMetadataRange()); removeOverlapping(metaCache, be); } - + lookups = Range.mergeOverlapping(lookups); - + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - + parent.binRanges(context, lookups, binnedRanges); - + // randomize server order ArrayList<String> tabletServers = new ArrayList<String>(binnedRanges.keySet()); Collections.shuffle(tabletServers); - + for (String tserver : tabletServers) { List<TabletLocation> locations = locationObtainer.lookupTablets(context, tserver, binnedRanges.get(tserver), parent); - + for (TabletLocation tabletLocation : locations) { updateCache(tabletLocation, lcSession); } @@ -674,21 +674,21 @@ public class TabletLocatorImpl extends TabletLocator { } } } - + protected static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, String location, KeyExtent ke, Range range) { Map<KeyExtent,List<Range>> tablets = binnedRanges.get(location); if (tablets == null) { tablets = new HashMap<KeyExtent,List<Range>>(); binnedRanges.put(location, tablets); } - + List<Range> tabletsRanges = tablets.get(ke); if (tabletsRanges == null) { tabletsRanges = new ArrayList<Range>(); tablets.put(ke, tabletsRanges); } - + tabletsRanges.add(range); } - + }
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java index b88571a..d3b26dc 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java @@ -33,11 +33,11 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; public class TabletServerBatchDeleter extends TabletServerBatchReader implements BatchDeleter { - + private final ClientContext context; private String tableId; private BatchWriterConfig bwConfig; - + public TabletServerBatchDeleter(ClientContext context, String tableId, Authorizations authorizations, int numQueryThreads, BatchWriterConfig bwConfig) throws TableNotFoundException { super(context, tableId, authorizations, numQueryThreads); @@ -46,7 +46,7 @@ public class TabletServerBatchDeleter extends TabletServerBatchReader implements this.bwConfig = bwConfig; super.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, BatchDeleter.class.getName() + ".NOVALUE", SortedKeyIterator.class)); } - + @Override public void delete() throws MutationsRejectedException, TableNotFoundException { BatchWriter bw = null; @@ -65,5 +65,5 @@ public class TabletServerBatchDeleter extends TabletServerBatchReader implements bw.close(); } } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java index 2a79f05..a7422c2 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java @@ -34,25 +34,25 @@ import org.apache.log4j.Logger; public class TabletServerBatchReader extends ScannerOptions implements BatchScanner { private static final Logger log = Logger.getLogger(TabletServerBatchReader.class); - + private String table; private int numThreads; private ExecutorService queryThreadPool; - + private final ClientContext context; private ArrayList<Range> ranges; - + private Authorizations authorizations = Authorizations.EMPTY; private Throwable ex = null; - + private static int nextBatchReaderInstance = 1; - + private static synchronized int getNextBatchReaderInstance() { return nextBatchReaderInstance++; } - + private final int batchReaderInstance = getNextBatchReaderInstance(); - + public TabletServerBatchReader(ClientContext context, String table, Authorizations authorizations, int numQueryThreads) { checkArgument(context != null, "context is null"); checkArgument(table != null, "table is null"); @@ -61,18 +61,18 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan this.authorizations = authorizations; this.table = table; this.numThreads = numQueryThreads; - + queryThreadPool = new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-"); - + ranges = null; ex = new Throwable(); } - + @Override public void close() { queryThreadPool.shutdownNow(); } - + /** * Warning: do not rely upon finalize to close this class. Finalize is not guaranteed to be called. */ @@ -83,31 +83,31 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan close(); } } - + @Override public void setRanges(Collection<Range> ranges) { if (ranges == null || ranges.size() == 0) { throw new IllegalArgumentException("ranges must be non null and contain at least 1 range"); } - + if (queryThreadPool.isShutdown()) { throw new IllegalStateException("batch reader closed"); } - + this.ranges = new ArrayList<Range>(ranges); - + } - + @Override public Iterator<Entry<Key,Value>> iterator() { if (ranges == null) { throw new IllegalStateException("ranges not set"); } - + if (queryThreadPool.isShutdown()) { throw new IllegalStateException("batch reader closed"); } - + return new TabletServerBatchReaderIterator(context, table, authorizations, ranges, numThreads, queryThreadPool, this, timeOut); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java index eb80f8b..df330ed 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java @@ -73,9 +73,9 @@ import org.apache.thrift.transport.TTransportException; import com.google.common.net.HostAndPort; public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> { - + private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class); - + private final ClientContext context; private final Instance instance; private final String table; @@ -83,30 +83,30 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value private final int numThreads; private final ExecutorService queryThreadPool; private final ScannerOptions options; - + private ArrayBlockingQueue<List<Entry<Key,Value>>> resultsQueue; private Iterator<Entry<Key,Value>> batchIterator; private List<Entry<Key,Value>> batch; private static final List<Entry<Key,Value>> LAST_BATCH = new ArrayList<Map.Entry<Key,Value>>(); private final Object nextLock = new Object(); - + private long failSleepTime = 100; - + private volatile Throwable fatalException = null; - + private Map<String,TimeoutTracker> timeoutTrackers; private Set<String> timedoutServers; private long timeout; - + private TabletLocator locator; - + public interface ResultReceiver { void receive(List<Entry<Key,Value>> entries); } - + public TabletServerBatchReaderIterator(ClientContext context, String table, Authorizations authorizations, ArrayList<Range> ranges, int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) { - + this.context = context; this.instance = context.getInstance(); this.table = table; @@ -115,24 +115,24 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value this.queryThreadPool = queryThreadPool; this.options = new ScannerOptions(scannerOptions); resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads); - + this.locator = new TimeoutTabletLocator(TabletLocator.getLocator(context, new Text(table)), timeout); - + timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>()); timedoutServers = Collections.synchronizedSet(new HashSet<String>()); this.timeout = timeout; - + if (options.fetchedColumns.size() > 0) { ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size()); for (Range range : ranges) { ranges2.add(range.bound(options.fetchedColumns.first(), options.fetchedColumns.last())); } - + ranges = ranges2; } - + ResultReceiver rr = new ResultReceiver() { - + @Override public void receive(List<Entry<Key,Value>> entries) { try { @@ -144,12 +144,12 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value log.warn("Failed to add Batch Scan result", e); fatalException = e; throw new RuntimeException(e); - + } } - + }; - + try { lookup(ranges, rr); } catch (RuntimeException re) { @@ -158,31 +158,31 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value throw new RuntimeException("Failed to create iterator", e); } } - + @Override public boolean hasNext() { synchronized (nextLock) { if (batch == LAST_BATCH) return false; - + if (batch != null && batchIterator.hasNext()) return true; - + // don't have one cached, try to cache one and return success try { batch = null; while (batch == null && fatalException == null && !queryThreadPool.isShutdown()) batch = resultsQueue.poll(1, TimeUnit.SECONDS); - + if (fatalException != null) if (fatalException instanceof RuntimeException) throw (RuntimeException) fatalException; else throw new RuntimeException(fatalException); - + if (queryThreadPool.isShutdown()) throw new RuntimeException("scanner closed"); - + batchIterator = batch.iterator(); return batch != LAST_BATCH; } catch (InterruptedException e) { @@ -190,7 +190,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } } } - + @Override public Entry<Key,Value> next() { // if there's one waiting, or hasNext() can get one, return it @@ -201,33 +201,33 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value throw new NoSuchElementException(); } } - + @Override public void remove() { throw new UnsupportedOperationException(); } - + private synchronized void lookup(List<Range> ranges, ResultReceiver receiver) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { List<Column> columns = new ArrayList<Column>(options.fetchedColumns); ranges = Range.mergeOverlapping(ranges); - + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); - + binRanges(locator, ranges, binnedRanges); - + doLookups(binnedRanges, receiver, columns); } - + private void binRanges(TabletLocator tabletLocator, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - + int lastFailureSize = Integer.MAX_VALUE; - + while (true) { - + binnedRanges.clear(); List<Range> failures = tabletLocator.binRanges(context, ranges, binnedRanges); - + if (failures.size() > 0) { // tried to only do table state checks when failures.size() == ranges.size(), however this did // not work because nothing ever invalidated entries in the tabletLocator cache... so even though @@ -238,9 +238,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value throw new TableDeletedException(table); else if (Tables.getTableState(instance, table) == TableState.OFFLINE) throw new TableOfflineException(instance, table); - + lastFailureSize = failures.size(); - + if (log.isTraceEnabled()) log.trace("Failed to bin " + failures.size() + " ranges, tablet locations were null, retrying in 100ms"); try { @@ -251,9 +251,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } else { break; } - + } - + // truncate the ranges to within the tablets... this makes it easier to know what work // needs to be redone when failures occurs and tablets have merged or split Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<String,Map<KeyExtent,List<Range>>>(); @@ -268,47 +268,47 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value clippedRanges.add(tabletRange.clip(range)); } } - + binnedRanges.clear(); binnedRanges.putAll(binnedRanges2); } - + private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { if (log.isTraceEnabled()) log.trace("Failed to execute multiscans against " + failures.size() + " tablets, retrying..."); - + try { Thread.sleep(failSleepTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - + // We were interrupted (close called on batchscanner) just exit log.debug("Exiting failure processing on interrupt"); return; } failSleepTime = Math.min(5000, failSleepTime * 2); - + Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>(); List<Range> allRanges = new ArrayList<Range>(); - + for (List<Range> ranges : failures.values()) allRanges.addAll(ranges); - + // since the first call to binRanges clipped the ranges to within a tablet, we should not get only // bin to the set of failed tablets binRanges(locator, allRanges, binnedRanges); - + doLookups(binnedRanges, receiver, columns); } - + private String getTableInfo() { return Tables.getPrintableTableInfoFromId(instance, table); } - + private class QueryTask implements Runnable { - + private String tsLocation; private Map<KeyExtent,List<Range>> tabletsRanges; private ResultReceiver receiver; @@ -316,7 +316,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value private final Map<KeyExtent,List<Range>> failures; private List<Column> columns; private int semaphoreSize; - + QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges, Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) { this.tsLocation = tsLocation; this.tabletsRanges = tabletsRanges; @@ -324,12 +324,12 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value this.columns = columns; this.failures = failures; } - + void setSemaphore(Semaphore semaphore, int semaphoreSize) { this.semaphore = semaphore; this.semaphoreSize = semaphoreSize; } - + @Override public void run() { String threadName = Thread.currentThread().getName(); @@ -349,19 +349,19 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value failures.putAll(tsFailures); } } - + } catch (IOException e) { synchronized (failures) { failures.putAll(tsFailures); failures.putAll(unscanned); } - + locator.invalidateCache(context.getInstance(), tsLocation); log.debug(e.getMessage(), e); } catch (AccumuloSecurityException e) { e.setTableInfo(getTableInfo()); log.debug(e.getMessage(), e); - + Tables.clearCache(instance); if (!Tables.exists(instance, table)) fatalException = new TableDeletedException(table); @@ -396,7 +396,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value log.debug(t.getMessage(), t); fatalException = t; } - + if (fatalException != null) { // we are finished with this batch query if (!resultsQueue.offer(LAST_BATCH)) { @@ -423,16 +423,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } } } - + } - + private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, final ResultReceiver receiver, List<Column> columns) { - + if (timedoutServers.containsAll(binnedRanges.keySet())) { // all servers have timed out throw new TimedOutException(timedoutServers); } - + // when there are lots of threads and a few tablet servers // it is good to break request to tablet servers up, the // following code determines if this is the case @@ -442,16 +442,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) { totalNumberOfTablets += entry.getValue().size(); } - + maxTabletsPerRequest = totalNumberOfTablets / numThreads; if (maxTabletsPerRequest == 0) { maxTabletsPerRequest = 1; } - + } - + Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>(); - + if (timedoutServers.size() > 0) { // go ahead and fail any timed out servers for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator = binnedRanges.entrySet().iterator(); iterator.hasNext();) { @@ -462,16 +462,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } } } - + // randomize tabletserver order... this will help when there are multiple // batch readers and writers running against accumulo List<String> locations = new ArrayList<String>(binnedRanges.keySet()); Collections.shuffle(locations); - + List<QueryTask> queryTasks = new ArrayList<QueryTask>(); - + for (final String tsLocation : locations) { - + final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation); if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) { QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns); @@ -486,44 +486,44 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value tabletSubset = new HashMap<KeyExtent,List<Range>>(); } } - + if (tabletSubset.size() > 0) { QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns); queryTasks.add(queryTask); } } } - + final Semaphore semaphore = new Semaphore(queryTasks.size()); semaphore.acquireUninterruptibly(queryTasks.size()); - + for (QueryTask queryTask : queryTasks) { queryTask.setSemaphore(semaphore, queryTasks.size()); queryThreadPool.execute(new TraceRunnable(queryTask)); } } - + static void trackScanning(Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned, MultiScanResult scanResult) { - + // translate returned failures, remove them from unscanned, and add them to failures Map<KeyExtent,List<Range>> retFailures = Translator.translate(scanResult.failures, Translators.TKET, new Translator.ListTranslator<TRange,Range>( Translators.TRT)); unscanned.keySet().removeAll(retFailures.keySet()); failures.putAll(retFailures); - + // translate full scans and remove them from unscanned HashSet<KeyExtent> fullScans = new HashSet<KeyExtent>(Translator.translate(scanResult.fullScans, Translators.TKET)); unscanned.keySet().removeAll(fullScans); - + // remove partial scan from unscanned if (scanResult.partScan != null) { KeyExtent ke = new KeyExtent(scanResult.partScan); Key nextKey = new Key(scanResult.partNextKey); - + ListIterator<Range> iterator = unscanned.get(ke).listIterator(); while (iterator.hasNext()) { Range range = iterator.next(); - + if (range.afterEndKey(nextKey) || (nextKey.equals(range.getEndKey()) && scanResult.partNextKeyInclusive != range.isEndKeyInclusive())) { iterator.remove(); } else if (range.contains(nextKey)) { @@ -534,41 +534,41 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } } } - + private static class TimeoutTracker { - + String server; Set<String> badServers; long timeOut; long activityTime; Long firstErrorTime = null; - + TimeoutTracker(String server, Set<String> badServers, long timeOut) { this(timeOut); this.server = server; this.badServers = badServers; } - + TimeoutTracker(long timeOut) { this.timeOut = timeOut; } - + void startingScan() { activityTime = System.currentTimeMillis(); } - + void check() throws IOException { if (System.currentTimeMillis() - activityTime > timeOut) { badServers.add(server); throw new IOException("Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server); } } - + void madeProgress() { activityTime = System.currentTimeMillis(); firstErrorTime = null; } - + void errorOccured(Exception e) { if (firstErrorTime == null) { firstErrorTime = activityTime; @@ -576,28 +576,28 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value badServers.add(server); } } - + /** */ public long getTimeOut() { return timeOut; } } - + public static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns, ScannerOptions options, Authorizations authorizations) throws IOException, AccumuloSecurityException, AccumuloServerException { doLookup(context, server, requested, failures, unscanned, receiver, columns, options, authorizations, new TimeoutTracker(Long.MAX_VALUE)); } - + static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns, ScannerOptions options, Authorizations authorizations, TimeoutTracker timeoutTracker) throws IOException, AccumuloSecurityException, AccumuloServerException { - + if (requested.size() == 0) { return; } - + // copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning() for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) { ArrayList<Range> ranges = new ArrayList<Range>(); @@ -606,7 +606,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } unscanned.put(new KeyExtent(entry.getKey()), ranges); } - + timeoutTracker.startingScan(); TTransport transport = null; try { @@ -618,13 +618,13 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value client = ThriftUtil.getTServerClient(parsedServer, context); try { - + OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Starting multi scan, tserver=" + server + " #tablets=" + requested.size() + " #ranges=" + sumSizes(requested.values()) + " ssil=" + options.serverSideIteratorList + " ssio=" + options.serverSideIteratorOptions); - + TabletType ttype = TabletType.type(requested.keySet()); boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server); - + Map<TKeyExtent,List<TRange>> thriftTabletRanges = Translator.translate(requested, Translators.KET, new Translator.ListTranslator<Range,TRange>( Translators.RT)); InitialMultiScan imsr = client.startMultiScan(Tracer.traceInfo(), context.rpcCreds(), thriftTabletRanges, @@ -634,48 +634,48 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString()); MultiScanResult scanResult = imsr.result; - + opTimer.stop("Got 1st multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? " scanID=" + imsr.scanID : "") + " in %DURATION%"); - + ArrayList<Entry<Key,Value>> entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size()); for (TKeyValue kv : scanResult.results) { entries.add(new SimpleImmutableEntry<Key,Value>(new Key(kv.key), new Value(kv.value))); } - + if (entries.size() > 0) receiver.receive(entries); - + if (entries.size() > 0 || scanResult.fullScans.size() > 0) timeoutTracker.madeProgress(); - + trackScanning(failures, unscanned, scanResult); - + while (scanResult.more) { - + timeoutTracker.check(); - + opTimer.start("Continuing multi scan, scanid=" + imsr.scanID); scanResult = client.continueMultiScan(Tracer.traceInfo(), imsr.scanID); opTimer.stop("Got more multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? " scanID=" + imsr.scanID : "") + " in %DURATION%"); - + entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size()); for (TKeyValue kv : scanResult.results) { entries.add(new SimpleImmutableEntry<Key,Value>(new Key(kv.key), new Value(kv.value))); } - + if (entries.size() > 0) receiver.receive(entries); - + if (entries.size() > 0 || scanResult.fullScans.size() > 0) timeoutTracker.madeProgress(); - + trackScanning(failures, unscanned, scanResult); } - + client.closeMultiScan(Tracer.traceInfo(), imsr.scanID); - + } finally { ThriftUtil.returnClient(client); } @@ -700,14 +700,14 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value ThriftTransportPool.getInstance().returnTransport(transport); } } - + static int sumSizes(Collection<List<Range>> values) { int sum = 0; - + for (List<Range> list : values) { sum += list.size(); } - + return sum; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java index c54c2f1..7abc826 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java @@ -76,45 +76,45 @@ import com.google.common.net.HostAndPort; /* * Differences from previous TabletServerBatchWriter * + As background threads finish sending mutations to tablet servers they decrement memory usage - * + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads, - * even if they are currently processing... new mutations are merged with mutations currently + * + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads, + * even if they are currently processing... new mutations are merged with mutations currently * processing in the background * + Failed mutations are held for 1000ms and then re-added to the unprocessed queue * + Flush holds adding of new mutations so it does not wait indefinitely - * + * * Considerations * + All background threads must catch and note Throwable - * + mutations for a single tablet server are only processed by one thread concurrently (if new mutations - * come in for a tablet server while one thread is processing mutations for it, no other thread should + * + mutations for a single tablet server are only processed by one thread concurrently (if new mutations + * come in for a tablet server while one thread is processing mutations for it, no other thread should * start processing those mutations) - * + * * Memory accounting * + when a mutation enters the system memory is incremented * + when a mutation successfully leaves the system memory is decremented - * - * - * + * + * + * */ public class TabletServerBatchWriter { - + private static final Logger log = Logger.getLogger(TabletServerBatchWriter.class); - + // basic configuration private final ClientContext context; private final long maxMem; private final long maxLatency; private final long timeout; private final Durability durability; - + // state private boolean flushing; private boolean closed; private MutationSet mutations; - + // background writer private final MutationWriter writer; - + // latency timers private final Timer jtimer = new Timer("BatchWriterLatencyTimer", true); private final Map<String,TimeoutTracker> timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>()); @@ -122,7 +122,7 @@ public class TabletServerBatchWriter { // stats private long totalMemUsed = 0; private long lastProcessingStartTime; - + private long totalAdded = 0; private final AtomicLong totalSent = new AtomicLong(0); private final AtomicLong totalBinned = new AtomicLong(0); @@ -132,7 +132,7 @@ public class TabletServerBatchWriter { private long initialGCTimes; private long initialCompileTimes; private double initialSystemLoad; - + private int tabletServersBatchSum = 0; private int tabletBatchSum = 0; private int numBatches = 0; @@ -140,7 +140,7 @@ public class TabletServerBatchWriter { private int minTabletBatch = Integer.MAX_VALUE; private int minTabletServersBatch = Integer.MAX_VALUE; private int maxTabletServersBatch = Integer.MIN_VALUE; - + // error handling private final Violations violations = new Violations(); private final Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>(); @@ -156,21 +156,21 @@ public class TabletServerBatchWriter { final long timeOut; long activityTime; Long firstErrorTime = null; - + TimeoutTracker(String server, long timeOut) { this.timeOut = timeOut; this.server = server; } - + void startingWrite() { activityTime = System.currentTimeMillis(); } - + void madeProgress() { activityTime = System.currentTimeMillis(); firstErrorTime = null; } - + void wroteNothing() { if (firstErrorTime == null) { firstErrorTime = activityTime; @@ -178,16 +178,16 @@ public class TabletServerBatchWriter { throw new TimedOutException(Collections.singleton(server)); } } - + void errorOccured(Exception e) { wroteNothing(); } - + public long getTimeOut() { return timeOut; } } - + public TabletServerBatchWriter(ClientContext context, BatchWriterConfig config) { this.context = context; this.maxMem = config.getMaxMemory(); @@ -196,9 +196,9 @@ public class TabletServerBatchWriter { this.mutations = new MutationSet(); this.lastProcessingStartTime = System.currentTimeMillis(); this.durability = config.getDurability(); - + this.writer = new MutationWriter(config.getMaxWriteThreads()); - + if (this.maxLatency != Long.MAX_VALUE) { jtimer.schedule(new TimerTask() { @Override @@ -215,7 +215,7 @@ public class TabletServerBatchWriter { }, 0, this.maxLatency / 4); } } - + private synchronized void startProcessing() { if (mutations.getMemoryUsed() == 0) return; @@ -223,125 +223,125 @@ public class TabletServerBatchWriter { writer.addMutations(mutations); mutations = new MutationSet(); } - + private synchronized void decrementMemUsed(long amount) { totalMemUsed -= amount; this.notifyAll(); } - + public synchronized void addMutation(String table, Mutation m) throws MutationsRejectedException { - + if (closed) throw new IllegalStateException("Closed"); if (m.size() == 0) throw new IllegalArgumentException("Can not add empty mutations"); - + checkForFailures(); - + while ((totalMemUsed > maxMem || flushing) && !somethingFailed) { waitRTE(); } - + // do checks again since things could have changed while waiting and not holding lock if (closed) throw new IllegalStateException("Closed"); checkForFailures(); - + if (startTime == 0) { startTime = System.currentTimeMillis(); - + List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) { initialGCTimes += garbageCollectorMXBean.getCollectionTime(); } - + CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean(); if (compMxBean.isCompilationTimeMonitoringSupported()) { initialCompileTimes = compMxBean.getTotalCompilationTime(); } - + initialSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); } - + // create a copy of mutation so that after this method returns the user // is free to reuse the mutation object, like calling readFields... this // is important for the case where a mutation is passed from map to reduce // to batch writer... the map reduce code will keep passing the same mutation // object into the reduce method m = new Mutation(m); - + totalMemUsed += m.estimatedMemoryUsed(); mutations.addMutation(table, m); totalAdded++; - + if (mutations.getMemoryUsed() >= maxMem / 2) { startProcessing(); checkForFailures(); } } - + public void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException { while (iterator.hasNext()) { addMutation(table, iterator.next()); } } - + public synchronized void flush() throws MutationsRejectedException { - + if (closed) throw new IllegalStateException("Closed"); - + Span span = Trace.start("flush"); - + try { checkForFailures(); - + if (flushing) { // some other thread is currently flushing, so wait while (flushing && !somethingFailed) waitRTE(); - + checkForFailures(); - + return; } - + flushing = true; - + startProcessing(); checkForFailures(); - + while (totalMemUsed > 0 && !somethingFailed) { waitRTE(); } - + flushing = false; this.notifyAll(); - + checkForFailures(); } finally { span.stop(); // somethingFailed = false; } } - + public synchronized void close() throws MutationsRejectedException { - + if (closed) return; - + Span span = Trace.start("close"); try { closed = true; - + startProcessing(); - + while (totalMemUsed > 0 && !somethingFailed) { waitRTE(); } - + logStats(); - + checkForFailures(); } finally { // make a best effort to release these resources @@ -350,27 +350,27 @@ public class TabletServerBatchWriter { span.stop(); } } - + private void logStats() { long finishTime = System.currentTimeMillis(); - + long finalGCTimes = 0; List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) { finalGCTimes += garbageCollectorMXBean.getCollectionTime(); } - + CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean(); long finalCompileTimes = 0; if (compMxBean.isCompilationTimeMonitoringSupported()) { finalCompileTimes = compMxBean.getTotalCompilationTime(); } - + double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0); double overallRate = totalAdded / ((finishTime - startTime) / 1000.0); - + double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); - + if (log.isTraceEnabled()) { log.trace(""); log.trace("TABLET SERVER BATCH WRITER STATISTICS"); @@ -400,39 +400,39 @@ public class TabletServerBatchWriter { log.trace(String.format("System load average : initial=%6.2f final=%6.2f", initialSystemLoad, finalSystemLoad)); } } - + private void updateSendStats(long count, long time) { totalSent.addAndGet(count); totalSendTime.addAndGet(time); } - + public void updateBinningStats(int count, long time, Map<String,TabletServerMutations<Mutation>> binnedMutations) { totalBinTime.addAndGet(time); totalBinned.addAndGet(count); updateBatchStats(binnedMutations); } - + private synchronized void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) { tabletServersBatchSum += binnedMutations.size(); - + minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size()); maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size()); - + int numTablets = 0; - + for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) { TabletServerMutations<Mutation> tsm = entry.getValue(); numTablets += tsm.getMutations().size(); } - + tabletBatchSum += numTablets; - + minTabletBatch = Math.min(minTabletBatch, numTablets); maxTabletBatch = Math.max(maxTabletBatch, numTablets); - + numBatches++; } - + private void waitRTE() { try { wait(); @@ -440,9 +440,9 @@ public class TabletServerBatchWriter { throw new RuntimeException(e); } } - + // BEGIN code for handling unrecoverable errors - + private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) { if (cvsList.size() > 0) { synchronized (this) { @@ -452,28 +452,28 @@ public class TabletServerBatchWriter { } } } - + private void updateAuthorizationFailures(Set<KeyExtent> keySet, SecurityErrorCode code) { HashMap<KeyExtent,SecurityErrorCode> map = new HashMap<KeyExtent,SecurityErrorCode>(); for (KeyExtent ke : keySet) map.put(ke, code); - + updateAuthorizationFailures(map); } - + private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> authorizationFailures) { if (authorizationFailures.size() > 0) { - + // was a table deleted? HashSet<String> tableIds = new HashSet<String>(); for (KeyExtent ke : authorizationFailures.keySet()) tableIds.add(ke.getTableId().toString()); - + Tables.clearCache(context.getInstance()); for (String tableId : tableIds) if (!Tables.exists(context.getInstance(), tableId)) throw new TableDeletedException(tableId); - + synchronized (this) { somethingFailed = true; mergeAuthorizationFailures(this.authorizationFailures, authorizationFailures); @@ -481,7 +481,7 @@ public class TabletServerBatchWriter { } } } - + private void mergeAuthorizationFailures(Map<KeyExtent,Set<SecurityErrorCode>> source, Map<KeyExtent,SecurityErrorCode> addition) { for (Entry<KeyExtent,SecurityErrorCode> entry : addition.entrySet()) { Set<SecurityErrorCode> secs = source.get(entry.getKey()); @@ -492,14 +492,14 @@ public class TabletServerBatchWriter { secs.add(entry.getValue()); } } - + private synchronized void updateServerErrors(String server, Exception e) { somethingFailed = true; this.serverSideErrors.add(server); this.notifyAll(); log.error("Server side error on " + server + ": " + e); } - + private synchronized void updateUnknownErrors(String msg, Throwable t) { somethingFailed = true; unknownErrors++; @@ -510,29 +510,29 @@ public class TabletServerBatchWriter { else log.error(msg, t); } - + private void checkForFailures() throws MutationsRejectedException { if (somethingFailed) { List<ConstraintViolationSummary> cvsList = violations.asList(); HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>> af = new HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>>(); for (Entry<KeyExtent,Set<SecurityErrorCode>> entry : authorizationFailures.entrySet()) { HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode> codes = new HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode>(); - + for (SecurityErrorCode sce : entry.getValue()) { codes.add(org.apache.accumulo.core.client.security.SecurityErrorCode.valueOf(sce.name())); } - + af.put(entry.getKey(), codes); } - + throw new MutationsRejectedException(context.getInstance(), cvsList, af, serverSideErrors, unknownErrors, lastUnknownError); } } - + // END code for handling unrecoverable errors - + // BEGIN code for handling failed mutations - + /** * Add mutations that previously failed back into the mix */ @@ -542,16 +542,16 @@ public class TabletServerBatchWriter { startProcessing(); } } - + private class FailedMutations extends TimerTask { - + private MutationSet recentFailures = null; private long initTime; - + FailedMutations() { jtimer.schedule(this, 0, 500); } - + private MutationSet init() { if (recentFailures == null) { recentFailures = new MutationSet(); @@ -559,35 +559,35 @@ public class TabletServerBatchWriter { } return recentFailures; } - + synchronized void add(String table, ArrayList<Mutation> tableFailures) { init().addAll(table, tableFailures); } - + synchronized void add(MutationSet failures) { init().addAll(failures); } - + synchronized void add(String location, TabletServerMutations<Mutation> tsm) { init(); for (Entry<KeyExtent,List<Mutation>> entry : tsm.getMutations().entrySet()) { recentFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue()); } - + } - + @Override public void run() { try { MutationSet rf = null; - + synchronized (this) { if (recentFailures != null && System.currentTimeMillis() - initTime > 1000) { rf = recentFailures; recentFailures = null; } } - + if (rf != null) { if (log.isTraceEnabled()) log.trace("tid=" + Thread.currentThread().getId() + " Requeuing " + rf.size() + " failed mutations"); @@ -599,26 +599,26 @@ public class TabletServerBatchWriter { } } } - + // END code for handling failed mutations - + // BEGIN code for sending mutations to tablet servers using background threads - + private class MutationWriter { - + private static final int MUTATION_BATCH_SIZE = 1 << 17; private final ExecutorService sendThreadPool; private final Map<String,TabletServerMutations<Mutation>> serversMutations; private final Set<String> queued; private final Map<String,TabletLocator> locators; - + public MutationWriter(int numSendThreads) { serversMutations = new HashMap<String,TabletServerMutations<Mutation>>(); queued = new HashSet<String>(); sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); locators = new HashMap<String,TabletLocator>(); } - + private TabletLocator getLocator(String tableId) { TabletLocator ret = locators.get(tableId); if (ret == null) { @@ -626,10 +626,10 @@ public class TabletServerBatchWriter { ret = new TimeoutTabletLocator(ret, timeout); locators.put(tableId, ret); } - + return ret; } - + private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations<Mutation>> binnedMutations) { String tableId = null; try { @@ -637,17 +637,17 @@ public class TabletServerBatchWriter { for (Entry<String,List<Mutation>> entry : es) { tableId = entry.getKey(); TabletLocator locator = getLocator(tableId); - + String table = entry.getKey(); List<Mutation> tableMutations = entry.getValue(); - + if (tableMutations != null) { ArrayList<Mutation> tableFailures = new ArrayList<Mutation>(); locator.binMutations(context, tableMutations, binnedMutations, tableFailures); - + if (tableFailures.size() > 0) { failedMutations.add(table, tableFailures); - + if (tableFailures.size() == tableMutations.size()) if (!Tables.exists(context.getInstance(), entry.getKey())) throw new TableDeletedException(entry.getKey()); @@ -655,7 +655,7 @@ public class TabletServerBatchWriter { throw new TableOfflineException(context.getInstance(), entry.getKey()); } } - + } return; } catch (AccumuloServerException ase) { @@ -673,12 +673,12 @@ public class TabletServerBatchWriter { } catch (TableNotFoundException e) { updateUnknownErrors(e.getMessage(), e); } - + // an error ocurred binnedMutations.clear(); - + } - + void addMutations(MutationSet mutationsToSend) { Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<String,TabletServerMutations<Mutation>>(); Span span = Trace.start("binMutations"); @@ -692,17 +692,17 @@ public class TabletServerBatchWriter { } addMutations(binnedMutations); } - + private synchronized void addMutations(Map<String,TabletServerMutations<Mutation>> binnedMutations) { - + int count = 0; - + // merge mutations into existing mutations for a tablet server for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) { String server = entry.getKey(); - + TabletServerMutations<Mutation> currentMutations = serversMutations.get(server); - + if (currentMutations == null) { serversMutations.put(server, entry.getValue()); } else { @@ -712,136 +712,136 @@ public class TabletServerBatchWriter { } } } - + if (log.isTraceEnabled()) for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet()) count += entry2.getValue().size(); - + } - + if (count > 0 && log.isTraceEnabled()) log.trace(String.format("Started sending %,d mutations to %,d tablet servers", count, binnedMutations.keySet().size())); - + // randomize order of servers ArrayList<String> servers = new ArrayList<String>(binnedMutations.keySet()); Collections.shuffle(servers); - + for (String server : servers) if (!queued.contains(server)) { sendThreadPool.submit(Trace.wrap(new SendTask(server))); queued.add(server); } } - + private synchronized TabletServerMutations<Mutation> getMutationsToSend(String server) { TabletServerMutations<Mutation> tsmuts = serversMutations.remove(server); if (tsmuts == null) queued.remove(server); - + return tsmuts; } - + class SendTask implements Runnable { - + final private String location; - + SendTask(String server) { this.location = server; } - + @Override public void run() { try { TabletServerMutations<Mutation> tsmuts = getMutationsToSend(location); - + while (tsmuts != null) { send(tsmuts); tsmuts = getMutationsToSend(location); } - + return; } catch (Throwable t) { updateUnknownErrors("Failed to send tablet server " + location + " its batch : " + t.getMessage(), t); } } - + public void send(TabletServerMutations<Mutation> tsm) throws AccumuloServerException, AccumuloSecurityException { - + MutationSet failures = null; - + String oldName = Thread.currentThread().getName(); - + Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations(); try { - + long count = 0; for (List<Mutation> list : mutationBatch.values()) { count += list.size(); } String msg = "sending " + String.format("%,d", count) + " mutations to " + String.format("%,d", mutationBatch.size()) + " tablets at " + location; Thread.currentThread().setName(msg); - + Span span = Trace.start("sendMutations"); try { - + TimeoutTracker timeoutTracker = timeoutTrackers.get(location); if (timeoutTracker == null) { timeoutTracker = new TimeoutTracker(location, timeout); timeoutTrackers.put(location, timeoutTracker); } - + long st1 = System.currentTimeMillis(); failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker); long st2 = System.currentTimeMillis(); if (log.isTraceEnabled()) log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in " + String.format("%.2f secs (%,.2f mutations/sec) with %,d failures", (st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), failures.size())); - + long successBytes = 0; for (Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) { for (Mutation mutation : entry.getValue()) { successBytes += mutation.estimatedMemoryUsed(); } } - + if (failures.size() > 0) { failedMutations.add(failures); successBytes -= failures.getMemoryUsed(); } - + updateSendStats(count, st2 - st1); decrementMemUsed(successBytes); - + } finally { span.stop(); } } catch (IOException e) { if (log.isTraceEnabled()) log.trace("failed to send mutations to " + location + " : " + e.getMessage()); - + HashSet<String> tables = new HashSet<String>(); for (KeyExtent ke : mutationBatch.keySet()) tables.add(ke.getTableId().toString()); - + for (String table : tables) TabletLocator.getLocator(context, new Text(table)).invalidateCache(context.getInstance(), location); - + failedMutations.add(location, tsm); } finally { Thread.currentThread().setName(oldName); } } } - + private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) throws IOException, AccumuloSecurityException, AccumuloServerException { if (tabMuts.size() == 0) { return new MutationSet(); } TInfo tinfo = Tracer.traceInfo(); - + timeoutTracker.startingWrite(); - + try { final HostAndPort parsedServer = HostAndPort.fromString(location); final TabletClientService.Iface client; @@ -853,10 +853,10 @@ public class TabletServerBatchWriter { try { MutationSet allFailures = new MutationSet(); - + if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) { Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next(); - + try { client.update(tinfo, context.rpcCreds(), entry.getKey().toThrift(), entry.getValue().get(0).toThrift(), DurabilityImpl.toThrift(durability)); } catch (NotServingTabletException e) { @@ -867,9 +867,9 @@ public class TabletServerBatchWriter { } timeoutTracker.madeProgress(); } else { - + long usid = client.startUpdate(tinfo, context.rpcCreds(), DurabilityImpl.toThrift(durability)); - + List<TMutation> updates = new ArrayList<TMutation>(); for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) { long size = 0; @@ -880,34 +880,34 @@ public class TabletServerBatchWriter { updates.add(mutation.toThrift()); size += mutation.numBytes(); } - + client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates); updates.clear(); size = 0; } } - + UpdateErrors updateErrors = client.closeUpdate(tinfo, usid); - + Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translators.TKET); updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translators.TCVST)); updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translators.TKET)); - + long totalCommitted = 0; - + for (Entry<KeyExtent,Long> entry : failures.entrySet()) { KeyExtent failedExtent = entry.getKey(); int numCommitted = (int) (long) entry.getValue(); totalCommitted += numCommitted; - + String table = failedExtent.getTableId().toString(); - + TabletLocator.getLocator(context, new Text(table)).invalidateCache(failedExtent); - + ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent); allFailures.addAll(table, mutations.subList(numCommitted, mutations.size())); } - + if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) { // nothing was successfully written timeoutTracker.wroteNothing(); @@ -936,34 +936,34 @@ public class TabletServerBatchWriter { } } } - + // END code for sending mutations to tablet servers using background threads - + private static class MutationSet { - + private final HashMap<String,List<Mutation>> mutations; private int memoryUsed = 0; - + MutationSet() { mutations = new HashMap<String,List<Mutation>>(); } - + void addMutation(String table, Mutation mutation) { List<Mutation> tabMutList = mutations.get(table); if (tabMutList == null) { tabMutList = new ArrayList<Mutation>(); mutations.put(table, tabMutList); } - + tabMutList.add(mutation); - + memoryUsed += mutation.estimatedMemoryUsed(); } - + Map<String,List<Mutation>> getMutations() { return mutations; } - + int size() { int result = 0; for (List<Mutation> perTable : mutations.values()) { @@ -971,28 +971,28 @@ public class TabletServerBatchWriter { } return result; } - + public void addAll(MutationSet failures) { Set<Entry<String,List<Mutation>>> es = failures.getMutations().entrySet(); - + for (Entry<String,List<Mutation>> entry : es) { String table = entry.getKey(); - + for (Mutation mutation : entry.getValue()) { addMutation(table, mutation); } } } - + public void addAll(String table, List<Mutation> mutations) { for (Mutation mutation : mutations) { addMutation(table, mutation); } } - + public int getMemoryUsed() { return memoryUsed; } - + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java index 3adcca9..d57bf94 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java @@ -22,7 +22,7 @@ import org.apache.accumulo.core.data.KeyExtent; public enum TabletType { ROOT, METADATA, USER; - + public static TabletType type(KeyExtent ke) { if (ke.isRootTablet()) return ROOT; @@ -30,20 +30,20 @@ public enum TabletType { return METADATA; return USER; } - + public static TabletType type(Collection<KeyExtent> extents) { if (extents.size() == 0) throw new IllegalArgumentException(); - + TabletType ttype = null; - + for (KeyExtent extent : extents) { if (ttype == null) ttype = type(extent); else if (ttype != type(extent)) throw new IllegalArgumentException("multiple extent types not allowed " + ttype + " " + type(extent)); } - + return ttype; } }