Author: ecn Date: Thu Jul 5 21:04:28 2012 New Revision: 1357915 URL: http://svn.apache.org/viewvc?rev=1357915&view=rev Log: ACCUMULO-665: patch from Josh Elser to properly pass column family information for intersecting iterators
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/OrIterator.java Thu Jul 5 21:04:28 2012 @@ -19,10 +19,12 @@ package org.apache.accumulo.core.iterato import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.PriorityQueue; +import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -47,15 +49,19 @@ public class OrIterator implements Sorte protected static class TermSource implements Comparable<TermSource> { public SortedKeyValueIterator<Key,Value> iter; public Text term; + public Collection<ByteSequence> seekColfams; public TermSource(TermSource other) { this.iter = other.iter; this.term = other.term; + this.seekColfams = other.seekColfams; } public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) { this.iter = iter; this.term = term; + // The desired column families for this source is the term itself + this.seekColfams = Collections.<ByteSequence>singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength())); } public int compareTo(TermSource o) { @@ -143,7 +149,7 @@ public class OrIterator implements Sorte newRange = new Range((newKey == null) ? nullKey : newKey, true, range.getEndKey(), false); } } - currentTerm.iter.seek(newRange, columnFamilies, inclusive); + currentTerm.iter.seek(newRange, currentTerm.seekColfams, true); // If there is no top key // OR we are: @@ -166,7 +172,7 @@ public class OrIterator implements Sorte // because an Or must have at least two elements. if (currentTerm == null) { for (TermSource TS : sources) { - TS.iter.seek(range, columnFamilies, inclusive); + TS.iter.seek(range, TS.seekColfams, true); if ((TS.iter.hasTop()) && ((TS.term != null) && (TS.term.compareTo(TS.iter.getTopKey().getColumnFamily()) == 0))) sorted.add(TS); @@ -196,7 +202,8 @@ public class OrIterator implements Sorte } } - TS.iter.seek(newRange, columnFamilies, inclusive); + // Seek only to the term for this source as a column family + TS.iter.seek(newRange, TS.seekColfams, true); // If there is no top key // OR we are: Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java Thu Jul 5 21:04:28 2012 @@ -81,6 +81,13 @@ public interface SortedKeyValueIterator< * Iterators that examine groups of adjacent key/value pairs (e.g. rows) to determine their top key and value should be sure that they properly handle a seek * to a key in the middle of such a group (e.g. the middle of a row). Even if the client always seeks to a range containing an entire group (a,c), the tablet * server could send back a batch of entries corresponding to (a,b], then reseek the iterator to range (b,c) when the scan is continued. + * + * {@link columnFamilies} is used, at the lowest level, to determine which data blocks inside of an RFile need to be opened for this iterator. This set of data + * blocks is also the set of locality groups defined for the given table. If no columnFamilies are provided, the data blocks for all locality groups inside of + * the correct RFile will be opened and seeked in an attempt to find the correct start key, irregardless of the startKey in the {@link range}. + * + * In an Accumulo instance in which multiple locality groups exist for a table, it is important to ensure that {@link columnFamilies} is properly set to the + * minimum required column families to ensure that data from separate locality groups is not inadvertently read. * * @param range * <tt>Range</tt> of keys to iterate over. Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IndexedDocIterator.java Thu Jul 5 21:04:28 2012 @@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Ran import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.user.IntersectingIterator.TermSource; import org.apache.hadoop.io.Text; /** @@ -134,6 +135,10 @@ public class IndexedDocIterator extends docColf = new Text(options.get(docFamilyOptionName)); docSource = source.deepCopy(env); indexColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(indexColf.getBytes(), 0, indexColf.getLength())); + + for (TermSource ts : this.sources) { + ts.seekColfams = indexColfSet; + } } @Override @@ -143,7 +148,7 @@ public class IndexedDocIterator extends @Override public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException { - super.seek(range, indexColfSet, true); + super.seek(range, null, true); } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java?rev=1357915&r1=1357914&r2=1357915&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/IntersectingIterator.java Thu Jul 5 21:04:28 2012 @@ -18,9 +18,11 @@ package org.apache.accumulo.core.iterato import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.Map; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; @@ -48,6 +50,10 @@ import org.apache.log4j.Logger; * * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs. * + * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections + * over terms. Extending classes should override the {@link TermSource#seekColfams} in their implementation's + * {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method. + * * README.shard in docs/examples shows an example of using the IntersectingIterator. */ public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> { @@ -83,24 +89,26 @@ public class IntersectingIterator implem protected static class TermSource { public SortedKeyValueIterator<Key,Value> iter; public Text term; + public Collection<ByteSequence> seekColfams; public boolean notFlag; public TermSource(TermSource other) { this.iter = other.iter; this.term = other.term; this.notFlag = other.notFlag; + this.seekColfams = other.seekColfams; } public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) { - this.iter = iter; - this.term = term; - this.notFlag = false; + this(iter, term, false); } public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) { this.iter = iter; this.term = term; this.notFlag = notFlag; + // The desired column families for this source is the term itself + this.seekColfams = Collections.<ByteSequence>singletonList(new ArrayByteSequence(term.getBytes(), 0, term.getLength())); } public String getTermString() { @@ -121,10 +129,6 @@ public class IntersectingIterator implem protected Key topKey = null; protected Value value = new Value(emptyByteArray); - protected Collection<ByteSequence> seekColumnFamilies; - - protected boolean inclusive; - public IntersectingIterator() {} @Override @@ -196,7 +200,7 @@ public class IntersectingIterator implem if (partitionCompare > 0) { // seek to at least the currentRow Key seekKey = buildKey(currentPartition, sources[sourceID].term); - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true); continue; } // check if this source has gone beyond currentRow @@ -213,7 +217,7 @@ public class IntersectingIterator implem // if not, then seek forwards to the right columnFamily if (termCompare > 0) { Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID); - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true); continue; } // check if this source is beyond the right columnFamily @@ -235,7 +239,7 @@ public class IntersectingIterator implem if (docIDCompare > 0) { // seek forwards Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID); - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true); continue; } // if we are equal to the target, this is an invalid result. @@ -273,7 +277,7 @@ public class IntersectingIterator implem if (partitionCompare > 0) { // seek to at least the currentRow Key seekKey = buildKey(currentPartition, sources[sourceID].term); - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true); continue; } // check if this source has gone beyond currentRow @@ -294,7 +298,7 @@ public class IntersectingIterator implem // if not, then seek forwards to the right columnFamily if (termCompare > 0) { Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID); - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true); continue; } // check if this source is beyond the right columnFamily @@ -314,7 +318,7 @@ public class IntersectingIterator implem return true; } Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey()); - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true); continue; } } @@ -332,7 +336,7 @@ public class IntersectingIterator implem if (docIDCompare > 0) { // seek forwards Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID); - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), sources[sourceID].seekColfams, true); continue; } // this source is at the current row, in its column family, and at currentCQ @@ -485,8 +489,6 @@ public class IntersectingIterator implem currentPartition = new Text(); currentDocID.set(emptyByteArray); - this.seekColumnFamilies = seekColumnFamilies; - this.inclusive = inclusive; // seek each of the sources to the right column family within the row given by key for (int i = 0; i < sourcesCount; i++) { @@ -497,9 +499,11 @@ public class IntersectingIterator implem } else { sourceKey = buildKey(getPartition(range.getStartKey()), sources[i].term); } - sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive); + // Seek only to the term for this source as a column family + sources[i].iter.seek(new Range(sourceKey, true, null, false), sources[i].seekColfams, true); } else { - sources[i].iter.seek(range, seekColumnFamilies, inclusive); + // Seek only to the term for this source as a column family + sources[i].iter.seek(range, sources[i].seekColfams, true); } } advanceToIntersection();