Repository: accumulo Updated Branches: refs/heads/1.5.2-SNAPSHOT f7fe2a847 -> f8861bf3e
ACCUMULO-2827: HeapIterator optimization. Assumes it's more probable that the next entry in a merge comes from the current iterator. Also switching from PriorityBuffer to PriorityQueue<> to avoid unsafe casts Signed-off-by: Keith Turner <ktur...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9801fe97 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9801fe97 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9801fe97 Branch: refs/heads/1.5.2-SNAPSHOT Commit: 9801fe976ab5b86b3ed337daff586b96d2cb7e4d Parents: 886cd19 Author: Jonathan Park <p...@sqrrl.com> Authored: Mon Jun 23 15:07:27 2014 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Thu Jun 26 10:45:51 2014 -0400 ---------------------------------------------------------------------- .../core/iterators/system/HeapIterator.java | 132 ++++++++++--------- 1 file changed, 73 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9801fe97/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java index e54f37c..4b26dcc 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java @@ -17,99 +17,113 @@ package org.apache.accumulo.core.iterators.system; import java.io.IOException; +import java.util.Comparator; +import java.util.PriorityQueue; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.commons.collections.buffer.PriorityBuffer; public abstract class HeapIterator implements SortedKeyValueIterator<Key,Value> { - private PriorityBuffer heap; - private SortedKeyValueIterator<Key,Value> currentIter; - - private static class Index implements Comparable<Index> { - SortedKeyValueIterator<Key,Value> iter; - - public Index(SortedKeyValueIterator<Key,Value> iter) { - this.iter = iter; - } - - public int compareTo(Index o) { - return iter.getTopKey().compareTo(o.iter.getTopKey()); + private PriorityQueue<SortedKeyValueIterator<Key,Value>> heap; + private SortedKeyValueIterator<Key,Value> topIdx = null; + private Key nextKey; + + private static class SKVIComparator implements Comparator<SortedKeyValueIterator<Key,Value>> { + + @Override + public int compare(SortedKeyValueIterator<Key,Value> o1, SortedKeyValueIterator<Key,Value> o2) { + return o1.getTopKey().compareTo(o2.getTopKey()); } } - + protected HeapIterator() { heap = null; } - + protected HeapIterator(int maxSize) { createHeap(maxSize); } - + protected void createHeap(int maxSize) { if (heap != null) throw new IllegalStateException("heap already exist"); - - heap = new PriorityBuffer(maxSize == 0 ? 1 : maxSize); + + heap = new PriorityQueue<SortedKeyValueIterator<Key,Value>>(maxSize == 0 ? 1 : maxSize, new SKVIComparator()); } - + @Override final public Key getTopKey() { - return currentIter.getTopKey(); + return topIdx.getTopKey(); } - + @Override final public Value getTopValue() { - return currentIter.getTopValue(); + return topIdx.getTopValue(); } - + @Override final public boolean hasTop() { - return heap.size() > 0; + return topIdx != null; } - + @Override final public void next() throws IOException { - switch (heap.size()) { - case 0: - throw new IllegalStateException("Called next() when there is no top"); - case 1: - // optimization for case when heap contains one entry, - // avoids remove and add - currentIter.next(); - if (!currentIter.hasTop()) { - heap.remove(); - currentIter = null; - } - break; - default: - Index idx = (Index) heap.remove(); - idx.iter.next(); - if (idx.iter.hasTop()) { - heap.add(idx); - } - // to get to the default case heap has at least - // two entries, therefore there must be at least - // one entry when get() is called below - currentIter = ((Index) heap.get()).iter; + if (topIdx == null) { + throw new IllegalStateException("Called next() when there is no top"); + } + + topIdx.next(); + if (!topIdx.hasTop()) { + if (nextKey == null) { + // No iterators left + topIdx = null; + return; + } + + pullReferencesFromHeap(); + } else { + if (nextKey == null) { + // topIdx is the only iterator + return; + } + + if (nextKey.compareTo(topIdx.getTopKey()) < 0) { + // Grab the next top iterator and put the current top iterator back on the heap + // This updating of references is special-cased to save on percolation on edge cases + // since the current top is guaranteed to not be the minimum + SortedKeyValueIterator<Key,Value> nextTopIdx = heap.remove(); + heap.add(topIdx); + + topIdx = nextTopIdx; + nextKey = heap.peek().getTopKey(); + } } } - + + private void pullReferencesFromHeap() { + topIdx = heap.remove(); + if (!heap.isEmpty()) { + nextKey = heap.peek().getTopKey(); + } else { + nextKey = null; + } + } + final protected void clear() { heap.clear(); - currentIter = null; + topIdx = null; + nextKey = null; } - + final protected void addSource(SortedKeyValueIterator<Key,Value> source) { - - if (source.hasTop()) - heap.add(new Index(source)); - - if (heap.size() > 0) - currentIter = ((Index) heap.get()).iter; - else - currentIter = null; + if (source.hasTop()) { + heap.add(source); + if (topIdx != null) { + heap.add(topIdx); + } + + pullReferencesFromHeap(); + } } - }