ACCUMULO-3097 Add some class-level javadoc for some iterators. Formatter also fixed some whitespace issues.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b0c3ba8c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b0c3ba8c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b0c3ba8c Branch: refs/heads/master Commit: b0c3ba8c5a6654f4409172202d793e9e9dd5d423 Parents: f838cd9 Author: Josh Elser <els...@apache.org> Authored: Wed Sep 3 13:24:44 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Wed Sep 3 13:29:15 2014 -0400 ---------------------------------------------------------------------- .../core/iterators/SkippingIterator.java | 13 +++- .../core/iterators/system/HeapIterator.java | 4 + .../system/SourceSwitchingIterator.java | 77 +++++++++++--------- .../iterators/system/SynchronizedIterator.java | 23 +++--- 4 files changed, 66 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0c3ba8c/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java index a73d997..9c18590 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/SkippingIterator.java @@ -22,20 +22,25 @@ import java.util.Collection; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Range; +/** + * Every call to {@link #next()} and {@link #seek(Range, Collection, boolean)} calls the parent's implementation and then calls the implementation's + * {@link #consume()}. This provides a cleaner way for implementers to write code that will read one to many key-value pairs, as opposed to returning every + * key-value pair seen. + */ public abstract class SkippingIterator extends WrappingIterator { - + @Override public void next() throws IOException { super.next(); consume(); } - + protected abstract void consume() throws IOException; - + @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { super.seek(range, columnFamilies, inclusive); consume(); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0c3ba8c/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java index 4b26dcc..8f2f66c 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java @@ -24,6 +24,10 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +/** + * Constructs a {@link PriorityQueue} of multiple SortedKeyValueIterators. Provides a simple way to interact with multiple SortedKeyValueIterators in sorted + * order. + */ public abstract class HeapIterator implements SortedKeyValueIterator<Key,Value> { private PriorityQueue<SortedKeyValueIterator<Key,Value>> heap; private SortedKeyValueIterator<Key,Value> topIdx = null; http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0c3ba8c/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java index 46d2007..33d0ebf 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SourceSwitchingIterator.java @@ -32,33 +32,38 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +/** + * A SortedKeyValueIterator which presents a view over some section of data, regardless of whether or not it is backed by memory (InMemoryMap) or an RFile + * (InMemoryMap that was minor compacted to a file). Clients reading from a table that has data in memory should not see interruption in their scan when that + * data is minor compacted. This iterator is designed to manage this behind the scene. + */ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value>, InterruptibleIterator { - + public interface DataSource { boolean isCurrent(); - + DataSource getNewDataSource(); - + DataSource getDeepCopyDataSource(IteratorEnvironment env); - + SortedKeyValueIterator<Key,Value> iterator() throws IOException; } - + private DataSource source; private SortedKeyValueIterator<Key,Value> iter; - + private Key key; private Value val; - + private Range range; private boolean inclusive; private Collection<ByteSequence> columnFamilies; - + private boolean onlySwitchAfterRow; private AtomicBoolean iflag; - + private final List<SourceSwitchingIterator> copies; - + private SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow, List<SourceSwitchingIterator> copies, AtomicBoolean iflag) { this.source = source; this.onlySwitchAfterRow = onlySwitchAfterRow; @@ -66,51 +71,51 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value this.iflag = iflag; copies.add(this); } - + public SourceSwitchingIterator(DataSource source, boolean onlySwitchAfterRow) { this(source, onlySwitchAfterRow, Collections.synchronizedList(new ArrayList<SourceSwitchingIterator>()), null); } - + public SourceSwitchingIterator(DataSource source) { this(source, false); } - + @Override public synchronized SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { return new SourceSwitchingIterator(source.getDeepCopyDataSource(env), onlySwitchAfterRow, copies, iflag); } - + @Override public Key getTopKey() { return key; } - + @Override public Value getTopValue() { return val; } - + @Override public boolean hasTop() { return key != null; } - + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { throw new UnsupportedOperationException(); } - + @Override public void next() throws IOException { readNext(false); } - + private synchronized void readNext(boolean initialSeek) throws IOException { - + // check of initialSeek second is intentional so that it does not short // circuit the call to switchSource boolean seekNeeded = (!onlySwitchAfterRow && switchSource()) || initialSeek; - + if (seekNeeded) if (initialSeek) iter.seek(range, columnFamilies, inclusive); @@ -123,11 +128,11 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value iter.seek(new Range(key.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive); } } - + if (iter.hasTop()) { Key nextKey = iter.getTopKey(); Value nextVal = iter.getTopValue(); - + try { key = (Key) nextKey.clone(); } catch (CloneNotSupportedException e) { @@ -139,62 +144,62 @@ public class SourceSwitchingIterator implements SortedKeyValueIterator<Key,Value val = null; } } - + private boolean switchSource() throws IOException { while (!source.isCurrent()) { source = source.getNewDataSource(); iter = source.iterator(); if (iflag != null) ((InterruptibleIterator) iter).setInterruptFlag(iflag); - + return true; } - + return false; } - + @Override public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { this.range = range; this.inclusive = inclusive; this.columnFamilies = columnFamilies; - + if (iter == null) { iter = source.iterator(); if (iflag != null) ((InterruptibleIterator) iter).setInterruptFlag(iflag); } - + readNext(true); } - + private synchronized void _switchNow() throws IOException { if (onlySwitchAfterRow) throw new IllegalStateException("Can only switch on row boundries"); - + if (switchSource()) { if (key != null) { iter.seek(new Range(key, true, range.getEndKey(), range.isEndKeyInclusive()), columnFamilies, inclusive); } } } - + public void switchNow() throws IOException { synchronized (copies) { for (SourceSwitchingIterator ssi : copies) ssi._switchNow(); } } - + @Override public synchronized void setInterruptFlag(AtomicBoolean flag) { if (copies.size() != 1) throw new IllegalStateException("setInterruptFlag() called after deep copies made " + copies.size()); - + this.iflag = flag; if (iter != null) ((InterruptibleIterator) iter).setInterruptFlag(flag); - + } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0c3ba8c/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java index 2657bab..ca7cd86 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java @@ -28,50 +28,51 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; /*** - * SynchronizedIterator: wrap a SortedKeyValueIterator so that all of its methods are synchronized + * Wraps a SortedKeyValueIterator so that all of its methods are synchronized. The intent is that user iterators which are multi-threaded have the possibility + * to call parent methods concurrently. The SynchronizedIterators aims to reduce the likelihood of unwanted concurrent access. */ public class SynchronizedIterator<K extends WritableComparable<?>,V extends Writable> implements SortedKeyValueIterator<K,V> { - + private SortedKeyValueIterator<K,V> source = null; - + @Override public synchronized void init(SortedKeyValueIterator<K,V> source, Map<String,String> options, IteratorEnvironment env) throws IOException { this.source = source; source.init(source, options, env); } - + @Override public synchronized boolean hasTop() { return source.hasTop(); } - + @Override public synchronized void next() throws IOException { source.next(); } - + @Override public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { source.seek(range, columnFamilies, inclusive); } - + @Override public synchronized K getTopKey() { return source.getTopKey(); } - + @Override public synchronized V getTopValue() { return source.getTopValue(); } - + @Override public synchronized SortedKeyValueIterator<K,V> deepCopy(IteratorEnvironment env) { return new SynchronizedIterator<K,V>(source.deepCopy(env)); } - + public SynchronizedIterator() {} - + public SynchronizedIterator(SortedKeyValueIterator<K,V> source) { this.source = source; }