Repository: accumulo Updated Branches: refs/heads/ACCUMULO-378 417b0b332 -> 4ac04b95d
ACCUMULO-2825 Add RowEncodingIterator The WholeRowIterator now extends the abstract RowEncodingIterator Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/73a9e6cf Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/73a9e6cf Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/73a9e6cf Branch: refs/heads/ACCUMULO-378 Commit: 73a9e6cfcad6c836ed9eb95421bc7306037739a9 Parents: 244c1ab Author: Ryan Leary <rle...@bbn.com> Authored: Sat May 17 21:03:03 2014 -0400 Committer: Ryan Leary <rle...@bbn.com> Committed: Sat May 17 21:10:52 2014 -0400 ---------------------------------------------------------------------- .../iterators/user/RowEncodingIterator.java | 172 +++++++++++++++++++ .../core/iterators/user/WholeRowIterator.java | 130 ++------------ 2 files changed, 191 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/73a9e6cf/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java new file mode 100644 index 0000000..dff1e04 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/RowEncodingIterator.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators.user; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.hadoop.io.Text; + +/** + * + * The WholeRowIterator is designed to provide row-isolation so that queries see mutations as atomic. It does so by encapsulating an entire row of key/value + * pairs into a single key/value pair, which is returned through the client as an atomic operation. + * + * <p> + * One caveat is that when seeking in the WholeRowIterator using a range that starts at a non-inclusive first key in a row, (e.g. seek(new Range(new Key(new + * Text("row")),false,...),...)) this iterator will skip to the next row. This is done in order to prevent repeated scanning of the same row when system + * automatically creates ranges of that form, which happens in the case of the client calling continueScan, or in the case of the tablet server continuing a + * scan after swapping out sources. + * + * <p> + * To regain the original key/value pairs of the row, call the decodeRow function on the key/value pair that this iterator returned. + * + * @see RowFilter + */ +public abstract class RowEncodingIterator implements SortedKeyValueIterator<Key,Value> { + + protected SortedKeyValueIterator<Key,Value> sourceIter; + private Key topKey = null; + private Value topValue = null; + + // decode a bunch of key value pairs that have been encoded into a single value + /** + * Given a value generated by the rowEncoder implementation, recreate the + * original Key, Value pairs. + * @param rowKey + * @param rowValue + * @return + * @throws IOException + */ + public abstract SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException; + + /** + * Take a stream of keys and values. Return values in the same order + * encoded such that all portions of the key (except for the row value) + * and the original value are encoded in some way. + * @param keys + * @param values + * @return + * @throws IOException + */ + public abstract Value rowEncoder(List<Key> keys, List<Value> values) throws IOException; + + /** + * Implement deepCopy. Ensure sourceIter is copied appropriately. + */ + @Override + public abstract SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env); + + List<Key> keys = new ArrayList<Key>(); + List<Value> values = new ArrayList<Value>(); + + private void prepKeys() throws IOException { + if (topKey != null) + return; + Text currentRow; + do { + if (sourceIter.hasTop() == false) + return; + currentRow = new Text(sourceIter.getTopKey().getRow()); + keys.clear(); + values.clear(); + while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow)) { + keys.add(new Key(sourceIter.getTopKey())); + values.add(new Value(sourceIter.getTopValue())); + sourceIter.next(); + } + } while (!filter(currentRow, keys, values)); + + topKey = new Key(currentRow); + topValue = rowEncoder(keys, values); + } + + /** + * + * @param currentRow + * All keys have this in their row portion (do not modify!). + * @param keys + * One key for each key in the row, ordered as they are given by the source iterator (do not modify!). + * @param values + * One value for each key in keys, ordered to correspond to the ordering in keys (do not modify!). + * @return true if we want to keep the row, false if we want to skip it + */ + protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) { + return true; + } + + @Override + public Key getTopKey() { + return topKey; + } + + @Override + public Value getTopValue() { + return topValue; + } + + @Override + public boolean hasTop() { + return topKey != null; + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + sourceIter = source; + } + + @Override + public void next() throws IOException { + topKey = null; + topValue = null; + prepKeys(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + topKey = null; + topValue = null; + + Key sk = range.getStartKey(); + + if (sk != null && sk.getColumnFamilyData().length() == 0 && sk.getColumnQualifierData().length() == 0 && sk.getColumnVisibilityData().length() == 0 + && sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive()) { + // assuming that we are seeking using a key previously returned by this iterator + // therefore go to the next row + Key followingRowKey = sk.followingKey(PartialKey.ROW); + if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey()) > 0) + return; + + range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive()); + } + + sourceIter.seek(range, columnFamilies, inclusive); + prepKeys(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/73a9e6cf/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java index 525f27c..c8bceea 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/WholeRowIterator.java @@ -21,21 +21,15 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.hadoop.io.Text; /** * @@ -53,19 +47,29 @@ import org.apache.hadoop.io.Text; * * @see RowFilter */ -public class WholeRowIterator implements SortedKeyValueIterator<Key,Value> { - - private SortedKeyValueIterator<Key,Value> sourceIter; - private Key topKey = null; - private Value topValue = null; - - public WholeRowIterator() { - - } +public class WholeRowIterator extends RowEncodingIterator { + public WholeRowIterator() {} WholeRowIterator(SortedKeyValueIterator<Key,Value> source) { this.sourceIter = source; } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + if (sourceIter != null) + return new WholeRowIterator(sourceIter.deepCopy(env)); + return new WholeRowIterator(); + } + + @Override + public SortedMap<Key, Value> rowDecoder(Key rowKey, Value rowValue) throws IOException { + return decodeRow(rowKey, rowValue); + } + + @Override + public Value rowEncoder(List<Key> keys, List<Value> values) throws IOException { + return encodeRow(keys, values); + } /** * Returns the byte array containing the field of row key from the given DataInputStream din. @@ -139,100 +143,4 @@ public class WholeRowIterator implements SortedKeyValueIterator<Key,Value> { return new Value(out.toByteArray()); } - - List<Key> keys = new ArrayList<Key>(); - List<Value> values = new ArrayList<Value>(); - - private void prepKeys() throws IOException { - if (topKey != null) - return; - Text currentRow; - do { - if (sourceIter.hasTop() == false) - return; - currentRow = new Text(sourceIter.getTopKey().getRow()); - keys.clear(); - values.clear(); - while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow)) { - keys.add(new Key(sourceIter.getTopKey())); - values.add(new Value(sourceIter.getTopValue())); - sourceIter.next(); - } - } while (!filter(currentRow, keys, values)); - - topKey = new Key(currentRow); - topValue = encodeRow(keys, values); - - } - - /** - * - * @param currentRow - * All keys have this in their row portion (do not modify!). - * @param keys - * One key for each key in the row, ordered as they are given by the source iterator (do not modify!). - * @param values - * One value for each key in keys, ordered to correspond to the ordering in keys (do not modify!). - * @return true if we want to keep the row, false if we want to skip it - */ - protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) { - return true; - } - - @Override - public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { - if (sourceIter != null) - return new WholeRowIterator(sourceIter.deepCopy(env)); - return new WholeRowIterator(); - } - - @Override - public Key getTopKey() { - return topKey; - } - - @Override - public Value getTopValue() { - return topValue; - } - - @Override - public boolean hasTop() { - return topKey != null; - } - - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { - sourceIter = source; - } - - @Override - public void next() throws IOException { - topKey = null; - topValue = null; - prepKeys(); - } - - @Override - public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - topKey = null; - topValue = null; - - Key sk = range.getStartKey(); - - if (sk != null && sk.getColumnFamilyData().length() == 0 && sk.getColumnQualifierData().length() == 0 && sk.getColumnVisibilityData().length() == 0 - && sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive()) { - // assuming that we are seeking using a key previously returned by this iterator - // therefore go to the next row - Key followingRowKey = sk.followingKey(PartialKey.ROW); - if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey()) > 0) - return; - - range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(), range.isEndKeyInclusive()); - } - - sourceIter.seek(range, columnFamilies, inclusive); - prepKeys(); - } - }