Merge branch '1.4.5-SNAPSHOT' into 1.5.2-SNAPSHOT Conflicts: core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5413823d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5413823d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5413823d Branch: refs/heads/master Commit: 5413823d6ca91454c81a48730fa75633bd463745 Parents: 22f9559 88761e0 Author: Josh Elser <els...@apache.org> Authored: Tue Mar 25 13:55:54 2014 -0700 Committer: Josh Elser <els...@apache.org> Committed: Tue Mar 25 13:55:54 2014 -0700 ---------------------------------------------------------------------- .../accumulo/core/iterators/Combiner.java | 92 +++++++++++--------- 1 file changed, 51 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5413823d/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java index 21cbfe5,0000000..58071cf mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/Combiner.java @@@ -1,313 -1,0 +1,323 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.IteratorSetting.Column; ++import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.conf.ColumnSet; ++import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** - * A SortedKeyValueIterator that combines the Values for different versions (timestamps) of a Key into a single Value. Combiner will replace one or more - * versions of a Key and their Values with the most recent Key and a Value which is the result of the reduce method. ++ * A SortedKeyValueIterator that combines the Values for different versions (timestamp) of a Key within a row into a single Value. Combiner will replace one or ++ * more versions of a Key and their Values with the most recent Key and a Value which is the result of the reduce method. An {@link IteratorSetting.Column} ++ * which only specifies a column family will combine all Keys in that column family individually. Similarly, a {@link IteratorSetting.Column} which specifies a ++ * column family and column qualifier will combine all Keys in column family and qualifier individually. Combination is only ever performed on multiple versions ++ * and not across column qualifiers or column visibilities. + * - * Subclasses must implement a reduce method: {@code public Value reduce(Key key, Iterator<Value> iter)}. ++ * Implementations must provide a reduce method: {@code public Value reduce(Key key, Iterator<Value> iter)}. + * + * This reduce method will be passed the most recent Key and an iterator over the Values for all non-deleted versions of that Key. A combiner will not combine + * keys that differ by more than the timestamp. ++ * ++ * This class and its implementations do not automatically filter out unwanted columns from those being combined, thus it is generally recommended to use a ++ * {@link Combiner} implementation with the {@link ScannerBase#fetchColumnFamily(Text)} or {@link ScannerBase#fetchColumn(Text, Text)} methods. + */ +public abstract class Combiner extends WrappingIterator implements OptionDescriber { + static final Logger log = Logger.getLogger(Combiner.class); + protected static final String COLUMNS_OPTION = "columns"; + protected static final String ALL_OPTION = "all"; - ++ + /** + * A Java Iterator that iterates over the Values for a given Key from a source SortedKeyValueIterator. + */ + public static class ValueIterator implements Iterator<Value> { + Key topKey; + SortedKeyValueIterator<Key,Value> source; + boolean hasNext; - ++ + /** + * Constructs an iterator over Values whose Keys are versions of the current topKey of the source SortedKeyValueIterator. + * + * @param source + * The SortedKeyValueIterator<Key,Value> from which to read data. + */ + public ValueIterator(SortedKeyValueIterator<Key,Value> source) { + this.source = source; + topKey = new Key(source.getTopKey()); + hasNext = _hasNext(); + } - ++ + private boolean _hasNext() { + return source.hasTop() && !source.getTopKey().isDeleted() && topKey.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS); + } - ++ + /** + * @return <tt>true</tt> if there is another Value + * + * @see java.util.Iterator#hasNext() + */ + @Override + public boolean hasNext() { + return hasNext; + } - ++ + /** + * @return the next Value + * + * @see java.util.Iterator#next() + */ + @Override + public Value next() { + if (!hasNext) + throw new NoSuchElementException(); + Value topValue = new Value(source.getTopValue()); + try { + source.next(); + hasNext = _hasNext(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return topValue; + } - ++ + /** + * unsupported + * + * @see java.util.Iterator#remove() + */ + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } - ++ + Key topKey; + Value topValue; - ++ + @Override + public Key getTopKey() { + if (topKey == null) + return super.getTopKey(); + return topKey; + } - ++ + @Override + public Value getTopValue() { + if (topKey == null) + return super.getTopValue(); + return topValue; + } - ++ + @Override + public boolean hasTop() { + return topKey != null || super.hasTop(); + } - ++ + @Override + public void next() throws IOException { + if (topKey != null) { + topKey = null; + topValue = null; + } else { + super.next(); + } - ++ + findTop(); + } - ++ + private Key workKey = new Key(); - ++ + /** + * Sets the topKey and topValue based on the top key of the source. If the column of the source top key is in the set of combiners, topKey will be the top key + * of the source and topValue will be the result of the reduce method. Otherwise, topKey and topValue will be unchanged. (They are always set to null before + * this method is called.) + */ + private void findTop() throws IOException { + // check if aggregation is needed + if (super.hasTop()) { + workKey.set(super.getTopKey()); + if (combineAllColumns || combiners.contains(workKey)) { + if (workKey.isDeleted()) + return; + topKey = workKey; + Iterator<Value> viter = new ValueIterator(getSource()); + topValue = reduce(topKey, viter); + while (viter.hasNext()) + viter.next(); + } + } + } - ++ + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + // do not want to seek to the middle of a value that should be combined... - ++ + Range seekRange = IteratorUtil.maximizeStartKeyTimeStamp(range); - ++ + super.seek(seekRange, columnFamilies, inclusive); + findTop(); - ++ + if (range.getStartKey() != null) { + while (hasTop() && getTopKey().equals(range.getStartKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS) + && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) { + // the value has a more recent time stamp, so pass it up + // log.debug("skipping "+getTopKey()); + next(); + } - ++ + while (hasTop() && range.beforeStartKey(getTopKey())) { + next(); + } + } + } - ++ + /** + * Reduces a list of Values into a single Value. + * + * @param key + * The most recent version of the Key being reduced. + * + * @param iter + * An iterator over the Values for different versions of the key. + * + * @return The combined Value. + */ + public abstract Value reduce(Key key, Iterator<Value> iter); - ++ + private ColumnSet combiners; + private boolean combineAllColumns; - ++ + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); - ++ + combineAllColumns = false; + if (options.containsKey(ALL_OPTION)) { + combineAllColumns = Boolean.parseBoolean(options.get(ALL_OPTION)); + if (combineAllColumns) + return; + } + if (!options.containsKey(COLUMNS_OPTION)) + throw new IllegalArgumentException("Must specify " + COLUMNS_OPTION + " option"); - ++ + String encodedColumns = options.get(COLUMNS_OPTION); + if (encodedColumns.length() == 0) + throw new IllegalArgumentException("The " + COLUMNS_OPTION + " must not be empty"); - ++ + combiners = new ColumnSet(Arrays.asList(encodedColumns.split(","))); + } - ++ + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + Combiner newInstance; + try { + newInstance = this.getClass().newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + newInstance.setSource(getSource().deepCopy(env)); + newInstance.combiners = combiners; + newInstance.combineAllColumns = combineAllColumns; + return newInstance; + } - ++ + @Override + public IteratorOptions describeOptions() { - IteratorOptions io = new IteratorOptions("comb", "Combiners apply reduce functions to values with identical keys", null, null); ++ IteratorOptions io = new IteratorOptions("comb", "Combiners apply reduce functions to multiple versions of values with otherwise equal keys", null, null); + io.addNamedOption(ALL_OPTION, "set to true to apply Combiner to every column, otherwise leave blank. if true, " + COLUMNS_OPTION + + " option will be ignored."); + io.addNamedOption(COLUMNS_OPTION, "<col fam>[:<col qual>]{,<col fam>[:<col qual>]} escape non-alphanum chars using %<hex>."); + return io; + } - ++ + @Override + public boolean validateOptions(Map<String,String> options) { + if (options.containsKey(ALL_OPTION)) { + try { + combineAllColumns = Boolean.parseBoolean(options.get(ALL_OPTION)); + } catch (Exception e) { + throw new IllegalArgumentException("bad boolean " + ALL_OPTION + ":" + options.get(ALL_OPTION)); + } + if (combineAllColumns) + return true; + } + if (!options.containsKey(COLUMNS_OPTION)) + throw new IllegalArgumentException("options must include " + ALL_OPTION + " or " + COLUMNS_OPTION); + + String encodedColumns = options.get(COLUMNS_OPTION); + if (encodedColumns.length() == 0) + throw new IllegalArgumentException("empty columns specified in option " + COLUMNS_OPTION); - ++ + for (String columns : encodedColumns.split(",")) { + if (!ColumnSet.isValidEncoding(columns)) + throw new IllegalArgumentException("invalid column encoding " + encodedColumns); + } - ++ + return true; + } - ++ + /** - * A convenience method to set which columns a combiner should be applied to. ++ * A convenience method to set which columns a combiner should be applied to. For each column specified, all versions of a Key which match that @{link ++ * IteratorSetting.Column} will be combined individually in each row. This method is likely to be used in conjunction with ++ * {@link ScannerBase#fetchColumnFamily(Text)} or {@link ScannerBase#fetchColumn(Text,Text)}. + * + * @param is + * iterator settings object to configure + * @param columns + * a list of columns to encode as the value for the combiner column configuration + */ - ++ + public static void setColumns(IteratorSetting is, List<IteratorSetting.Column> columns) { + String sep = ""; + StringBuilder sb = new StringBuilder(); - ++ + for (Column col : columns) { + sb.append(sep); + sep = ","; + sb.append(ColumnSet.encodeColumns(col.getFirst(), col.getSecond())); + } - ++ + is.addOption(COLUMNS_OPTION, sb.toString()); + } - ++ + /** - * A convenience method to set the "all columns" option on a Combiner. ++ * A convenience method to set the "all columns" option on a Combiner. This will combine all columns individually within each row. + * + * @param is + * iterator settings object to configure + * @param combineAllColumns + * if true, the columns option is ignored and the Combiner will be applied to all columns + */ + public static void setCombineAllColumns(IteratorSetting is, boolean combineAllColumns) { + is.addOption(ALL_OPTION, Boolean.toString(combineAllColumns)); + } +}